fixed: websocket server wrong execution thread

This commit is contained in:
2022-12-25 03:13:01 -08:00
parent f34b05e6ff
commit e9418f4b22
10 changed files with 110 additions and 109 deletions
+2 -2
View File
@@ -37,7 +37,7 @@ public class Application : INetworkListener
if (configuration.ServerType == "websocket") if (configuration.ServerType == "websocket")
_server = new NativeWebSocketServer(_executor); _server = new NativeWebSocketServer(_executor);
Debug.Assert(_server != null, $"Socket type not supported: {configuration.ServerType}. Supported: [enet, websocket]"); Debug.Assert(_server != null, $"Socket type not supported: {configuration.ServerType}. Supported: [enet, websocket]");
} }
@@ -48,7 +48,7 @@ public class Application : INetworkListener
_executor.Execute(); _executor.Execute();
_loop.Tick(); _loop.Tick();
_server.Poll(); _server.Poll();
Thread.Sleep((int)1000.0f / _configuration.ServerTickRate); Thread.Sleep((int)1000.0f / _configuration.ServerTickRate);
} }
} }
+2 -3
View File
@@ -3,9 +3,8 @@ using Ragon.Common;
namespace Ragon.Core.Game; namespace Ragon.Core.Game;
public class EntityState public class EntityState
{ {
private Logger _logger = LogManager.GetCurrentClassLogger();
private List<EntityStateProperty> _properties; private List<EntityStateProperty> _properties;
private Entity _entity; private Entity _entity;
@@ -23,7 +22,7 @@ public class EntityState
public void Write(RagonSerializer serializer) public void Write(RagonSerializer serializer)
{ {
serializer.WriteUShort(_entity.Id); serializer.WriteUShort(_entity.Id);
for (int propertyIndex = 0; propertyIndex < _properties.Count; propertyIndex++) for (int propertyIndex = 0; propertyIndex < _properties.Count; propertyIndex++)
{ {
var property = _properties[propertyIndex]; var property = _properties[propertyIndex];
+2 -2
View File
@@ -24,8 +24,8 @@ public class EntityStateProperty
public ReadOnlySpan<byte> Read() public ReadOnlySpan<byte> Read()
{ {
var dataSpan = _data.AsSpan(); var dataSpan = _data.AsSpan();
var src = dataSpan.Slice(0, Size);
return dataSpan.Slice(0, Size); return src;
} }
public void Write(ref ReadOnlySpan<byte> src) public void Write(ref ReadOnlySpan<byte> src)
@@ -14,6 +14,7 @@ public sealed class EntityStateHandler: IHandler
for (var entityIndex = 0; entityIndex < entitiesCount; entityIndex++) for (var entityIndex = 0; entityIndex < entitiesCount; entityIndex++)
{ {
var entityId = reader.ReadUShort(); var entityId = reader.ReadUShort();
if (room.Entities.TryGetValue(entityId, out var entity)) if (room.Entities.TryGetValue(entityId, out var entity))
{ {
entity.State.Read(reader); entity.State.Read(reader);
@@ -76,6 +76,6 @@ public sealed class JoinOrCreateHandler : IHandler
var sendData = writer.ToArray(); var sendData = writer.ToArray();
player.Connection.Reliable.Send(sendData); player.Connection.Reliable.Send(sendData);
_logger.Trace($"Joined to room {room.Id}"); _logger.Trace($"{player.Connection.Id}|{player.Name} joined to room {room.Id}");
} }
} }
+2 -1
View File
@@ -36,10 +36,11 @@ public sealed class SceneLoadedHandler : IHandler
entity.State.AddProperty(new EntityStateProperty(propertySize, propertyType)); entity.State.AddProperty(new EntityStateProperty(propertySize, propertyType));
} }
_logger.Trace($"Player {context.Connection.Id}|{context.LobbyPlayer.Name} created entity {entity.Id}:{entity.Type}");
room.AttachEntity(player, entity); room.AttachEntity(player, entity);
} }
_logger.Trace($"Player {context.Connection.Id}|{context.LobbyPlayer.Name} loaded with {statics} scene entities"); _logger.Trace($"Player {context.Connection.Id}|{context.LobbyPlayer.Name} loaded");
room.WaitPlayersList.Add(player); room.WaitPlayersList.Add(player);
+1 -1
View File
@@ -35,7 +35,7 @@ namespace Ragon.Server.ENet
_host.Create(address, _connections.Length, 2, 0, 0, 1024 * 1024); _host.Create(address, _connections.Length, 2, 0, 0, 1024 * 1024);
var protocolDecoded = RagonVersion.Parse(_protocol); var protocolDecoded = RagonVersion.Parse(_protocol);
_logger.Info($"Network listening on {configuration.Port}"); _logger.Info($"Listen at 127.0.0.1:{configuration.Port}");
_logger.Info($"Protocol: {protocolDecoded}"); _logger.Info($"Protocol: {protocolDecoded}");
} }
@@ -13,109 +13,112 @@ namespace Ragon.Core;
public class NativeWebSocketServer : INetworkServer public class NativeWebSocketServer : INetworkServer
{ {
private ILogger _logger = LogManager.GetCurrentClassLogger(); private ILogger _logger = LogManager.GetCurrentClassLogger();
private INetworkListener _networkListener; private INetworkListener _networkListener;
private Stack<ushort> _sequencer; private Stack<ushort> _sequencer;
private Executor _executor; private Executor _executor;
private HttpListener _httpListener; private HttpListener _httpListener;
private WebSocketConnection[] _connections; private WebSocketConnection[] _connections;
private ushort _lastPeerId; private List<WebSocketConnection> _activeConnections;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
public NativeWebSocketServer(Executor executor) public NativeWebSocketServer(Executor executor)
{
_sequencer = new Stack<ushort>();
_connections = Array.Empty<WebSocketConnection>();
_activeConnections = new List<WebSocketConnection>();
_executor = executor;
}
public async void StartAccept(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{ {
_sequencer = new Stack<ushort>(); var context = await _httpListener.GetContextAsync();
_connections = Array.Empty<WebSocketConnection>(); if (!context.Request.IsWebSocketRequest) continue;
_executor = executor;
var webSocketContext = await context.AcceptWebSocketAsync(null);
var webSocket = webSocketContext.WebSocket;
var peerId = _sequencer.Pop();
var connection = new WebSocketConnection(webSocket, peerId);
_connections[peerId] = connection;
StartListen(connection, cancellationToken);
} }
}
public async Task StartAccept(CancellationToken cancellationToken) async void StartListen(WebSocketConnection connection, CancellationToken cancellationToken)
{
_activeConnections.Add(connection);
_networkListener.OnConnected(connection);
var webSocket = connection.Socket;
var bytes = new byte[2048];
var buffer = new Memory<byte>(bytes);
while (
webSocket.State == WebSocketState.Open ||
!cancellationToken.IsCancellationRequested)
{ {
while (!cancellationToken.IsCancellationRequested) try
{ {
var context = await _httpListener.GetContextAsync(); var result = await webSocket.ReceiveAsync(buffer, cancellationToken);
if (!context.Request.IsWebSocketRequest) continue; var dataRaw = buffer.Slice(0, result.Count);
var webSocketContext = await context.AcceptWebSocketAsync(null);
var webSocket = webSocketContext.WebSocket;
var peerId = _sequencer.Pop();
var connection = new WebSocketConnection(webSocket, peerId);
_lastPeerId = peerId;
_connections[peerId] = connection;
_networkListener.OnConnected(connection);
_executor.Run(StartListen(connection, cancellationToken));
}
}
async Task StartListen(WebSocketConnection connection, CancellationToken cancellationToken)
{
var webSocket = connection.Socket;
var bytes = new byte[2048];
var buffer = new Memory<byte>(bytes);
while (
webSocket.State == WebSocketState.Open ||
!cancellationToken.IsCancellationRequested)
{
try
{
var result = await webSocket.ReceiveAsync(buffer, cancellationToken);
var dataRaw = buffer.Slice(0, result.Count);
_networkListener.OnData(connection, dataRaw.ToArray());
}
catch (Exception ex)
{
break;
}
}
_sequencer.Push(connection.Id); _networkListener.OnData(connection, dataRaw.ToArray());
_networkListener.OnDisconnected(connection); }
catch (Exception ex)
{
break;
}
} }
public async void Poll() _sequencer.Push(connection.Id);
{ _activeConnections.Remove(connection);
foreach (var conn in _connections) _networkListener.OnDisconnected(connection);
{ }
if (conn != null)
{
await conn.Flush();
}
}
}
public void Start( public void Poll()
INetworkListener listener, {
NetworkConfiguration configuration Flush();
) }
{
_networkListener = listener;
_cancellationTokenSource = new CancellationTokenSource();
var limit = (ushort) configuration.LimitConnections; public async void Flush()
for (ushort i = limit; i != 0; i--) {
_sequencer.Push(i); foreach (var conn in _activeConnections)
await conn.Flush();
_sequencer.Push(0); }
_connections = new WebSocketConnection[configuration.LimitConnections]; public void Start(
INetworkListener listener,
_httpListener = new HttpListener(); NetworkConfiguration configuration
_httpListener.Prefixes.Add($"http://127.0.0.1:{configuration.Port}/"); )
_httpListener.Start(); {
_networkListener = listener;
_cancellationTokenSource = new CancellationTokenSource();
_executor.Run(StartAccept(_cancellationTokenSource.Token)); var limit = (ushort)configuration.LimitConnections;
for (ushort i = limit; i != 0; i--)
_sequencer.Push(i);
var protocolDecoded = RagonVersion.Parse(configuration.Protocol); _sequencer.Push(0);
_logger.Info($"Network listening on http://*:{configuration.Port}/");
_logger.Info($"Protocol: {protocolDecoded}");
}
public void Stop() _connections = new WebSocketConnection[configuration.LimitConnections];
{
_cancellationTokenSource.Cancel(); _httpListener = new HttpListener();
_httpListener.Stop(); _httpListener.Prefixes.Add($"http://127.0.0.1:{configuration.Port}/");
} _httpListener.Start();
_executor.Run(() => StartAccept(_cancellationTokenSource.Token));
var protocolDecoded = RagonVersion.Parse(configuration.Protocol);
_logger.Info($"Listen at http://*:{configuration.Port}/");
_logger.Info($"Protocol: {protocolDecoded}");
}
public void Stop()
{
_cancellationTokenSource.Cancel();
_httpListener.Stop();
}
} }
+2 -2
View File
@@ -9,9 +9,9 @@ public class Executor: TaskScheduler
private Queue<Task> _pendingTasks; private Queue<Task> _pendingTasks;
private TaskFactory _taskFactory; private TaskFactory _taskFactory;
public void Run(Task task) public void Run(Action action)
{ {
_taskFactory.StartNew(() => task); _taskFactory.StartNew(action);
} }
public Executor() public Executor()
+3 -6
View File
@@ -6,20 +6,18 @@
Ragon is fully free, small and high perfomance room based game server with plugin based architecture. Ragon is fully free, small and high perfomance room based game server with plugin based architecture.
<a href="https://ragon-server.com/docs/category/basics">Documentation</a> <a href="http://localhost:3000/docs/installation">Documentation</a>
<br>
<a href="https://ragon-server.com/docs/get-started">Get started</a>
### Features: ### Features:
- Effective - Effective
- Free - Free
- Simple matchmaking - Lobby
- Room based architecture - Room based architecture
- Сustomizable authorization - Сustomizable authorization
- Сustomizable server-side logic via plugins with flexible API*(2) - Сustomizable server-side logic via plugins with flexible API*(2)
- No CCU limitations*(1) - No CCU limitations*(1)
- No Room count limitations - No Room count limitations
- Reliable UDP - Reliable UDP
### Requirements ### Requirements
- OSX, Windows, Linux(Ubuntu, Debian) - OSX, Windows, Linux(Ubuntu, Debian)
@@ -33,5 +31,4 @@ MIT
### Tips ### Tips
\* Limited to 4095 CCU by library ENet-Sharp (1) \* Limited to 4095 CCU by library ENet-Sharp (1)
\* Non finally (2) \* Non finally (2)