This commit is contained in:
2024-11-03 11:36:58 +03:00
parent 672bb1ff6d
commit edf90b39c4
79 changed files with 233 additions and 3830 deletions
@@ -21,4 +21,8 @@
<ProjectReference Include="..\Ragon.Server\Ragon.Server.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Sources\" />
</ItemGroup>
</Project>
@@ -1,76 +0,0 @@
/*
* Copyright 2023-2024 Eduard Kargin <kargin.eduard@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Threading.Channels;
namespace Ragon.Server.IO;
public class Executor : TaskScheduler, IExecutor
{
private readonly ChannelReader<Task> _reader;
private readonly ChannelWriter<Task> _writer;
private readonly Queue<Task> _pendingTasks;
private readonly TaskFactory _taskFactory;
public Task Run(Action action, TaskCreationOptions task = TaskCreationOptions.None)
{
return _taskFactory.StartNew(action, task);
}
public Executor()
{
var channel = Channel.CreateUnbounded<Task>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = true,
});
_reader = channel.Reader;
_writer = channel.Writer;
_taskFactory = new TaskFactory(this);
_pendingTasks = new Queue<Task>();
}
protected override IEnumerable<Task>? GetScheduledTasks()
{
throw new NotSupportedException();
}
protected override void QueueTask(Task task)
{
_writer.TryWrite(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
public void Update()
{
while (_reader.TryRead(out var task))
{
TryExecuteTask(task);
if (task.Status == TaskStatus.Running)
_pendingTasks.Enqueue(task);
}
while (_pendingTasks.TryDequeue(out var task))
_writer.TryWrite(task);
}
}
@@ -1,22 +0,0 @@
/*
* Copyright 2023-2024 Eduard Kargin <kargin.eduard@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Ragon.Server.IO;
public interface IExecutor
{
public Task Run(Action action, TaskCreationOptions task = TaskCreationOptions.None);
}
@@ -1,66 +0,0 @@
/*
* Copyright 2023-2024 Eduard Kargin <kargin.eduard@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Net.WebSockets;
using Ragon.Server.IO;
using Ragon.Server.Logging;
namespace Ragon.Server.WebSocketServer;
public sealed class WebSocketConnection : INetworkConnection
{
private readonly IRagonLogger _logger = LoggerManager.GetLogger(nameof(WebSocketConnection));
public ushort Id { get; }
public INetworkChannel Reliable { get; private set; }
public INetworkChannel Unreliable { get; private set; }
public WebSocket Socket { get; private set; }
private WebSocketReliableChannel[] _channels;
public WebSocketConnection(WebSocket webSocket, ushort peerId)
{
Id = peerId;
Socket = webSocket;
var reliableChannel = new WebSocketReliableChannel(webSocket);
var unreliableChannel = new WebSocketReliableChannel(webSocket);
_channels = new[] { reliableChannel, unreliableChannel };
Reliable = reliableChannel;
Unreliable = unreliableChannel;
}
public void Close()
{
Socket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);
}
public async Task Flush()
{
foreach (var channel in _channels)
{
try
{
await channel.Flush();
}
catch (Exception ex)
{
_logger.Error(ex);
}
}
}
}
@@ -1,52 +0,0 @@
/*
* Copyright 2023-2024 Eduard Kargin <kargin.eduard@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Net.WebSockets;
using Ragon.Protocol;
using Ragon.Server.IO;
namespace Ragon.Server.WebSocketServer;
public class WebSocketReliableChannel : INetworkChannel
{
private Queue<byte[]> _queue;
private WebSocket _socket;
public WebSocketReliableChannel(WebSocket webSocket)
{
_socket = webSocket;
_queue = new Queue<byte[]>(512);
}
public void Send(byte[] data)
{
_queue.Enqueue(data);
}
public void Send(RagonBuffer buffer)
{
var sendData = buffer.ToArray();
_queue.Enqueue(sendData);
}
public async Task Flush()
{
while (_queue.TryDequeue(out var sendData) && _socket.State == WebSocketState.Open)
{
await _socket.SendAsync(sendData, WebSocketMessageType.Binary, WebSocketMessageFlags.EndOfMessage, CancellationToken.None);
}
}
}
@@ -1,175 +0,0 @@
/*
* Copyright 2023-2024 Eduard Kargin <kargin.eduard@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Buffers;
using System.Net;
using System.Net.WebSockets;
using Ragon.Protocol;
using Ragon.Server.IO;
using Ragon.Server.Logging;
namespace Ragon.Server.WebSocketServer;
public class WebSocketServer : INetworkServer
{
private readonly IRagonLogger _logger = LoggerManager.GetLogger(nameof(WebSocketServer));
private INetworkListener _networkListener;
private Stack<ushort> _sequencer;
private Executor _executor;
private HttpListener _httpListener;
private WebSocketConnection[] _connections;
private List<WebSocketConnection> _activeConnections;
private CancellationTokenSource _cancellationTokenSource;
public WebSocketServer()
{
_sequencer = new Stack<ushort>();
_connections = Array.Empty<WebSocketConnection>();
_activeConnections = new List<WebSocketConnection>();
_executor = new Executor();
}
public async void StartAccept(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
WebSocketConnection connection = null!;
try
{
var context = await _httpListener.GetContextAsync();
if (!context.Request.IsWebSocketRequest)
{
context.Response.StatusCode = 200;
context.Response.ContentLength64 = 0;
context.Response.Close();
continue;
}
var webSocketContext = await context.AcceptWebSocketAsync(null);
var webSocket = webSocketContext.WebSocket;
var peerId = _sequencer.Pop();
connection = new WebSocketConnection(webSocket, peerId);
}
catch (Exception ex)
{
_logger.Error(ex);
continue;
}
_connections[connection.Id] = connection;
StartListen(connection, cancellationToken);
}
}
async void StartListen(WebSocketConnection connection, CancellationToken cancellationToken)
{
_activeConnections.Add(connection);
_networkListener.OnConnected(connection);
var webSocket = connection.Socket;
var rawData = new byte[2048];
var rawDataBuffer = new Memory<byte>(rawData);
var data = new ArrayBufferWriter<byte>();
while (webSocket.State == WebSocketState.Open || !cancellationToken.IsCancellationRequested)
{
try
{
while (true)
{
var result = await webSocket.ReceiveAsync(rawDataBuffer, cancellationToken);
var payload = rawDataBuffer.Slice(0, result.Count);
data.Write(payload.Span);
if (result.EndOfMessage)
break;
}
if (data.WrittenCount > 0)
{
_networkListener.OnData(connection, NetworkChannel.RELIABLE, data.WrittenMemory.ToArray());
data.Clear();
}
}
catch (Exception ex)
{
break;
}
}
_sequencer.Push(connection.Id);
_activeConnections.Remove(connection);
_networkListener.OnDisconnected(connection);
}
public void Update()
{
_executor.Update();
Flush();
}
public void Broadcast(byte[] data, NetworkChannel channel)
{
foreach (var activeConnection in _activeConnections)
activeConnection.Reliable.Send(data);
}
public async void Flush()
{
foreach (var conn in _activeConnections)
await conn.Flush();
}
public void Listen(
INetworkListener listener,
NetworkConfiguration configuration
)
{
_networkListener = listener;
_cancellationTokenSource = new CancellationTokenSource();
var limit = (ushort)configuration.LimitConnections;
for (ushort i = limit; i != 0; i--)
_sequencer.Push(i);
_sequencer.Push(0);
_connections = new WebSocketConnection[configuration.LimitConnections];
_httpListener = new HttpListener();
_httpListener.Prefixes.Add($"http://{configuration.Address}:{configuration.Port}/");
_httpListener.Start();
_executor.Run(() => StartAccept(_cancellationTokenSource.Token));
var protocolDecoded = RagonVersion.Parse(configuration.Protocol);
_logger.Info($"Listen at http://{configuration.Address}:{configuration.Port}/");
_logger.Info($"Protocol: {protocolDecoded}");
}
public void Stop()
{
_cancellationTokenSource.Cancel();
_httpListener.Stop();
}
}