feat: websockets

This commit is contained in:
2022-12-20 12:20:52 -08:00
parent ab85578ccf
commit a5a67963be
24 changed files with 276 additions and 69 deletions
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>Ragon.WebSockets</RootNamespace>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Ragon.Server\Ragon.Server.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="NLog" Version="5.0.5" />
</ItemGroup>
</Project>
@@ -0,0 +1,44 @@
using System.Net.WebSockets;
using NLog;
namespace Ragon.Server.NativeWebSockets;
public sealed class WebSocketConnection : INetworkConnection
{
private Logger _logger = LogManager.GetCurrentClassLogger();
public ushort Id { get; }
public INetworkChannel Reliable { get; private set; }
public INetworkChannel Unreliable { get; private set; }
public WebSocket Socket { get; private set; }
private WebSocketReliableChannel[] _channels;
public WebSocketConnection(WebSocket webSocket, ushort peerId)
{
Id = peerId;
Socket = webSocket;
var reliableChannel = new WebSocketReliableChannel(webSocket);
var unreliableChannel = new WebSocketReliableChannel(webSocket);
_channels = new[] { reliableChannel, unreliableChannel };
Reliable = reliableChannel;
Unreliable = unreliableChannel;
}
public async Task Flush()
{
foreach (var channel in _channels)
{
try
{
await channel.Flush();
}
catch (Exception ex)
{
_logger.Error(ex);
}
}
}
}
@@ -0,0 +1,27 @@
using System.Net.WebSockets;
using Ragon.Server;
namespace Ragon.Server.NativeWebSockets;
public class WebSocketReliableChannel : INetworkChannel
{
private Queue<byte[]> _queue;
private WebSocket _socket;
public WebSocketReliableChannel(WebSocket webSocket)
{
_socket = webSocket;
_queue = new Queue<byte[]>(512);
}
public void Send(byte[] data)
{
_queue.Enqueue(data);
}
public async Task Flush()
{
while (_queue.TryDequeue(out var sendData) && _socket.State == WebSocketState.Open)
await _socket.SendAsync(sendData, WebSocketMessageType.Binary, WebSocketMessageFlags.EndOfMessage, CancellationToken.None);
}
}
@@ -0,0 +1,121 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.WebSockets;
using NLog;
using Ragon.Common;
using Ragon.Core.Server;
using Ragon.Server;
using Ragon.Server.NativeWebSockets;
namespace Ragon.Core;
public class NativeWebSocketServer : INetworkServer
{
private ILogger _logger = LogManager.GetCurrentClassLogger();
private INetworkListener _networkListener;
private Stack<ushort> _sequencer;
private Executor _executor;
private HttpListener _httpListener;
private WebSocketConnection[] _connections;
private ushort _lastPeerId;
private CancellationTokenSource _cancellationTokenSource;
public NativeWebSocketServer(Executor executor)
{
_sequencer = new Stack<ushort>();
_connections = Array.Empty<WebSocketConnection>();
_executor = executor;
}
public async Task StartAccept(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var context = await _httpListener.GetContextAsync();
if (!context.Request.IsWebSocketRequest) continue;
var webSocketContext = await context.AcceptWebSocketAsync(null);
var webSocket = webSocketContext.WebSocket;
var peerId = _sequencer.Pop();
var connection = new WebSocketConnection(webSocket, peerId);
_lastPeerId = peerId;
_connections[peerId] = connection;
_networkListener.OnConnected(connection);
_executor.Run(StartListen(connection, cancellationToken));
}
}
async Task StartListen(WebSocketConnection connection, CancellationToken cancellationToken)
{
var webSocket = connection.Socket;
var bytes = new byte[2048];
var buffer = new Memory<byte>(bytes);
while (
webSocket.State == WebSocketState.Open ||
!cancellationToken.IsCancellationRequested)
{
try
{
var result = await webSocket.ReceiveAsync(buffer, cancellationToken);
var dataRaw = buffer.Slice(0, result.Count);
_networkListener.OnData(connection, dataRaw.ToArray());
}
catch (Exception ex)
{
break;
}
}
_sequencer.Push(connection.Id);
_networkListener.OnDisconnected(connection);
}
public async void Poll()
{
foreach (var conn in _connections)
{
if (conn != null)
{
await conn.Flush();
}
}
}
public void Start(
INetworkListener listener,
NetworkConfiguration configuration
)
{
_networkListener = listener;
_cancellationTokenSource = new CancellationTokenSource();
var limit = (ushort) configuration.LimitConnections;
for (ushort i = limit; i != 0; i--)
_sequencer.Push(i);
_sequencer.Push(0);
_connections = new WebSocketConnection[configuration.LimitConnections];
_httpListener = new HttpListener();
_httpListener.Prefixes.Add($"http://127.0.0.1:{configuration.Port}/");
_httpListener.Start();
_executor.Run(StartAccept(_cancellationTokenSource.Token));
var protocolDecoded = RagonVersion.Parse(configuration.Protocol);
_logger.Info($"Network listening on http://*:{configuration.Port}/");
_logger.Info($"Protocol: {protocolDecoded}");
}
public void Stop()
{
_cancellationTokenSource.Cancel();
_httpListener.Stop();
}
}