/* * Copyright 2023-2024 Eduard Kargin * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System.Buffers; using System.Net; using System.Net.WebSockets; using Ragon.Protocol; using Ragon.Server.IO; using Ragon.Server.Logging; namespace Ragon.Server.WebSocketServer; public class WebSocketServer : INetworkServer { private readonly IRagonLogger _logger = LoggerManager.GetLogger(nameof(WebSocketServer)); private INetworkListener _networkListener; private Stack _sequencer; private Executor _executor; private HttpListener _httpListener; private WebSocketConnection[] _connections; private List _activeConnections; private CancellationTokenSource _cancellationTokenSource; public WebSocketServer() { _sequencer = new Stack(); _connections = Array.Empty(); _activeConnections = new List(); _executor = new Executor(); } public async ValueTask StartAccept(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { WebSocketConnection connection = null!; try { var context = await _httpListener.GetContextAsync(); if (!context.Request.IsWebSocketRequest) { context.Response.StatusCode = 200; context.Response.ContentLength64 = 0; context.Response.Close(); continue; } var webSocketContext = await context.AcceptWebSocketAsync(null); var webSocket = webSocketContext.WebSocket; var peerId = _sequencer.Pop(); connection = new WebSocketConnection(webSocket, peerId); } catch (Exception ex) { _logger.Error(ex); continue; } _connections[connection.Id] = connection; _ = StartListen(connection, cancellationToken); } } async ValueTask StartListen(WebSocketConnection connection, CancellationToken cancellationToken) { _activeConnections.Add(connection); _networkListener.OnConnected(connection); var webSocket = connection.Socket; var rawData = new byte[2048]; var rawDataBuffer = new Memory(rawData); var data = new ArrayBufferWriter(); while (webSocket.State == WebSocketState.Open || !cancellationToken.IsCancellationRequested) { try { while (true) { var result = await webSocket.ReceiveAsync(rawDataBuffer, cancellationToken); var payload = rawDataBuffer.Slice(0, result.Count); data.Write(payload.Span); if (result.EndOfMessage) break; } if (data.WrittenCount > 0) { _networkListener.OnData(connection, NetworkChannel.RELIABLE, data.WrittenMemory.ToArray()); data.Clear(); } } catch (Exception ex) { break; } } _sequencer.Push(connection.Id); _activeConnections.Remove(connection); _networkListener.OnDisconnected(connection); } public void Update() { _executor.Update(); _ = Flush(); } public void Broadcast(byte[] data, NetworkChannel channel) { foreach (var activeConnection in _activeConnections) activeConnection.Reliable.Send(data); } public async ValueTask Flush() { foreach (var conn in _activeConnections) await conn.Flush(); } public void Listen( INetworkListener listener, NetworkConfiguration configuration ) { _networkListener = listener; _cancellationTokenSource = new CancellationTokenSource(); var limit = (ushort)configuration.LimitConnections; for (ushort i = limit; i != 0; i--) _sequencer.Push(i); _sequencer.Push(0); _connections = new WebSocketConnection[configuration.LimitConnections]; _httpListener = new HttpListener(); _httpListener.Prefixes.Add($"http://{configuration.Address}:{configuration.Port}/"); _httpListener.Start(); _executor.Run(() => StartAccept(_cancellationTokenSource.Token)); var protocolDecoded = RagonVersion.Parse(configuration.Protocol); _logger.Info($"Listen at http://{configuration.Address}:{configuration.Port}/"); _logger.Info($"Protocol: {protocolDecoded}"); } public void Stop() { _cancellationTokenSource.Cancel(); _httpListener.Stop(); } }