Skip to content

Commit b7bf89a

Browse files
committed
feat: improve data receiving
1 parent 0e54e32 commit b7bf89a

10 files changed

Lines changed: 114 additions & 101 deletions

File tree

SimpleNetworkManager.NET.Tests/Messages/MessageTests.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@ public void TestMessage_Serialize_Deserialize()
1818
// Act - Serialize using custom binary format
1919
var serializedData = originalMessage.Serialize();
2020

21+
// Find if data length is correct or not
22+
int comparingLength = BitConverter.ToInt32(serializedData, 0);
23+
Assert.Equal(serializedData.Length, comparingLength);
24+
2125
// Extract message data and verify header
22-
var messageData = BaseMessage.ExtractMessageData(serializedData, out uint messageType);
26+
var messageData = BaseMessage.ExtractMessageData(serializedData, serializedData.Length, out uint messageType);
2327
Assert.Equal(originalMessage.GetMessageType(), messageType);
2428

2529
// Deserialize the message data using MessagePack

SimpleNetworkManager.NET.Tests/Network/StreamExtensionsTests.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using Insthync.SimpleNetworkManager.NET.Network;
33
using Insthync.SimpleNetworkManager.NET.Tests.Messages;
44
using MessagePack;
5+
using System.Buffers;
56

67
namespace Insthync.SimpleNetworkManager.NET.Tests.Network
78
{
@@ -20,10 +21,11 @@ public async Task ReadMessage_Test()
2021
var cancellationTokenSource = new CancellationTokenSource();
2122
await stream.WriteMessageAsync(originalMessage, cancellationTokenSource.Token);
2223
stream.Position = 0;
23-
var message = await stream.ReadMessageAsync(cancellationTokenSource.Token);
24-
Assert.NotNull(message);
24+
var result = await stream.ReadMessageAsync(cancellationTokenSource.Token);
25+
Assert.NotNull(result);
2526

26-
var messageData = BaseMessage.ExtractMessageData(message, out var messageType);
27+
var messageData = BaseMessage.ExtractMessageData(result.Value.buffer, result.Value.length, out var messageType);
28+
ArrayPool<byte>.Shared.Return(result.Value.buffer);
2729
Assert.Equal(originalMessage.GetMessageType(), messageType);
2830

2931
// Deserialize the message data using MessagePack

SimpleNetworkManager.NET/Messages/BaseMessage.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,21 @@ public byte[] Serialize()
6262
/// <param name="message">Binary data containing the message</param>
6363
/// <param name="messageType">The message type extracted from the header</param>
6464
/// <returns>Deserialized message data (without header)</returns>
65-
public static byte[] ExtractMessageData(byte[] message, out uint messageType)
65+
public static byte[] ExtractMessageData(byte[] message, int messageLength, out uint messageType)
6666
{
67-
if (message.Length < 8)
67+
if (messageLength < 8)
6868
throw new ArgumentException("Data too short to contain message header");
6969

7070
// Read total size (for validation)
7171
var totalSize = BitConverter.ToInt32(message, 0);
72-
if (totalSize != message.Length)
72+
if (totalSize != messageLength)
7373
throw new ArgumentException("Message size mismatch");
7474

7575
// Read message type
7676
messageType = BitConverter.ToUInt32(message, 4);
7777

7878
// Extract message data
79-
var messageData = new byte[message.Length - 8];
79+
var messageData = new byte[messageLength - 8];
8080
Array.Copy(message, 8, messageData, 0, messageData.Length);
8181

8282
return messageData;

SimpleNetworkManager.NET/Network/BaseClientConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ public void UnassignConnectionId()
6565
ConnectionId = 0;
6666
}
6767

68-
public void OnMessageReceived(byte[] message)
68+
public void OnMessageReceived(byte[] buffer, int length)
6969
{
70-
MessageReceived?.Invoke(this, message);
70+
MessageReceived?.Invoke(this, buffer, length);
7171
}
7272

7373
public void OnDisconnected()

SimpleNetworkManager.NET/Network/BaseNetworkClient.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,16 @@ protected virtual void OnClientDisconnected(BaseClientConnection clientConnectio
9494
/// Event handler for messages received from clients
9595
/// </summary>
9696
/// <param name="clientConnection">The client connection that sent the message</param>
97-
/// <param name="message">Received message</param>
98-
protected virtual async void OnClientMessageReceived(BaseClientConnection clientConnection, byte[] message)
97+
/// <param name="buffer">Message buffer</param>
98+
/// <param name="length">Length of buffer</param>
99+
protected virtual async void OnClientMessageReceived(BaseClientConnection clientConnection, byte[] buffer, int length)
99100
{
100101
try
101102
{
102103
_logger.LogDebug("Received message from server");
103104

104105
// Route the message to the appropriate handler
105-
await _messageRouterService.RouteMessageAsync(clientConnection, message);
106+
await _messageRouterService.RouteMessageAsync(clientConnection, buffer, length);
106107
}
107108
catch (Exception ex)
108109
{

SimpleNetworkManager.NET/Network/BaseNetworkServer.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,16 @@ protected virtual void OnClientDisconnected(BaseClientConnection clientConnectio
114114
/// Event handler for messages received from clients
115115
/// </summary>
116116
/// <param name="clientConnection">The client connection that sent the message</param>
117-
/// <param name="message">Received message</param>
118-
protected virtual async void OnClientMessageReceived(BaseClientConnection clientConnection, byte[] message)
117+
/// <param name="buffer">Message buffer</param>
118+
/// <param name="length">Length of buffer</param>
119+
protected virtual async void OnClientMessageReceived(BaseClientConnection clientConnection, byte[] buffer, int length)
119120
{
120121
try
121122
{
122123
_logger.LogDebug("Received message from client {ConnectionId}", clientConnection.ConnectionId);
123124

124125
// Route the message to the appropriate handler
125-
await _messageRouterService.RouteMessageAsync(clientConnection, message);
126+
await _messageRouterService.RouteMessageAsync(clientConnection, buffer, length);
126127
}
127128
catch (Exception ex)
128129
{
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
namespace Insthync.SimpleNetworkManager.NET.Network
22
{
3-
public delegate void MessageReceivedHandler(BaseClientConnection clientConnection, byte[] message);
3+
public delegate void MessageReceivedHandler(BaseClientConnection clientConnection, byte[] buffer, int length);
44
public delegate void DisconnectedHandler(BaseClientConnection clientConnection);
55
}

SimpleNetworkManager.NET/Network/StreamExtensions.cs

Lines changed: 78 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using Cysharp.Threading.Tasks;
22
using Insthync.SimpleNetworkManager.NET.Messages;
33
using System;
4+
using System.Buffers;
45
using System.IO;
56
using System.Threading;
67

@@ -11,134 +12,134 @@ public static class StreamExtensions
1112
/// <summary>
1213
/// Reads exactly the specified number of bytes from the stream
1314
/// </summary>
14-
public static async UniTask<int> ReadExactAsync(this Stream stream, byte[] buffer, int count, CancellationToken cancellationToken, int offset = 0)
15+
public static async UniTask<int> ReadExactAsync(this Stream stream, Memory<byte> buffer, int count, CancellationToken cancellationToken)
1516
{
16-
if (stream == null)
17-
return 0;
18-
1917
int totalBytesRead = 0;
2018
while (totalBytesRead < count && !cancellationToken.IsCancellationRequested)
2119
{
22-
var bytesRead = await stream.ReadAsync(
23-
buffer, offset + totalBytesRead, count - totalBytesRead, cancellationToken);
24-
20+
var slice = buffer.Slice(totalBytesRead, count - totalBytesRead);
21+
int bytesRead = await stream.ReadAsync(slice, cancellationToken);
2522
if (bytesRead == 0)
2623
break; // Connection closed
27-
2824
totalBytesRead += bytesRead;
2925
}
3026
return totalBytesRead;
3127
}
3228

3329
/// <summary>
34-
/// Read message from the stream
30+
/// Read message from the stream, message buffer should be returned to array pool after use
3531
/// </summary>
3632
/// <param name="stream"></param>
3733
/// <param name="cancellationToken"></param>
3834
/// <returns></returns>
3935
/// <exception cref="InvalidMessageSizeException"></exception>
40-
public static async UniTask<byte[]?> ReadMessageAsync(this Stream stream, CancellationToken cancellationToken)
36+
public static async UniTask<(byte[] buffer, int length)?> ReadMessageAsync(this Stream stream, CancellationToken cancellationToken)
4137
{
4238
if (stream == null)
4339
return null;
4440

45-
// Read message size (4 bytes)
46-
var sizeBuffer = new byte[4];
47-
var bytesRead = await stream.ReadExactAsync(sizeBuffer, 4, cancellationToken);
48-
if (bytesRead == 0 || bytesRead < 4)
49-
return null; // Connection closed
50-
51-
var dataSize = BitConverter.ToInt32(sizeBuffer, 0);
52-
int minSize = 8;
53-
int maxSize = 1024 * 1024; // 1 MB
54-
if (dataSize < minSize || dataSize > maxSize)
41+
byte[] sizeBuffer = ArrayPool<byte>.Shared.Rent(4);
42+
try
5543
{
56-
throw new InvalidMessageSizeException()
44+
var sizeMemory = sizeBuffer.AsMemory(0, 4);
45+
int bytesRead = await stream.ReadExactAsync(sizeMemory, 4, cancellationToken);
46+
if (bytesRead < 4)
47+
return null;
48+
49+
int dataSize = BitConverter.ToInt32(sizeBuffer, 0);
50+
int minSize = 8;
51+
int maxSize = 1024 * 1024; // 1 MB limit
52+
if (dataSize < minSize || dataSize > maxSize)
53+
throw new InvalidMessageSizeException() { Size = dataSize, MinSize = minSize, MaxSize = maxSize };
54+
55+
// Rent buffer for the full message
56+
byte[] dataBuffer = ArrayPool<byte>.Shared.Rent(dataSize);
57+
Memory<byte> dataMemory = dataBuffer.AsMemory(0, dataSize);
58+
59+
// Copy size header into data buffer
60+
sizeMemory.Span.CopyTo(dataMemory.Span);
61+
62+
// Read the rest of the message body directly into the span
63+
int remainingBytes = dataSize - 4;
64+
bytesRead = await stream.ReadExactAsync(dataMemory.Slice(4, remainingBytes), remainingBytes, cancellationToken);
65+
if (bytesRead != remainingBytes)
5766
{
58-
Size = dataSize,
59-
MinSize = minSize,
60-
MaxSize = maxSize
61-
};
62-
}
67+
ArrayPool<byte>.Shared.Return(dataBuffer);
68+
return null; // Connection closed
69+
}
6370

64-
// Read the complete message (including the size we already read)
65-
var dataBuffer = new byte[dataSize];
66-
sizeBuffer.CopyTo(dataBuffer, 0);
67-
68-
// Already read 4 bytes for message size, so decrease by 4
69-
var remainingBytes = dataSize - 4;
70-
// Read next bytes by remaining bytes, skip 4 bytes (message size which already copied above)
71-
bytesRead = await stream.ReadExactAsync(dataBuffer, remainingBytes, cancellationToken, 4);
72-
if (bytesRead != remainingBytes)
73-
return null; // Connection closed
74-
75-
return dataBuffer;
71+
return (dataBuffer, dataSize);
72+
}
73+
finally
74+
{
75+
ArrayPool<byte>.Shared.Return(sizeBuffer);
76+
}
7677
}
7778

7879
/// <summary>
7980
/// Reads exactly the specified number of bytes from the stream
8081
/// </summary>
81-
public static int ReadExact(this Stream stream, byte[] buffer, int count, int offset = 0)
82+
public static int ReadExact(this Stream stream, Span<byte> buffer, int count, CancellationToken cancellationToken)
8283
{
83-
if (stream == null)
84-
return 0;
85-
8684
int totalBytesRead = 0;
87-
while (totalBytesRead < count)
85+
while (totalBytesRead < count && !cancellationToken.IsCancellationRequested)
8886
{
89-
var bytesRead = stream.Read(
90-
buffer, offset + totalBytesRead, count - totalBytesRead);
91-
87+
int bytesRead = stream.Read(buffer.Slice(totalBytesRead, count - totalBytesRead));
9288
if (bytesRead == 0)
9389
break; // Connection closed
94-
9590
totalBytesRead += bytesRead;
9691
}
9792
return totalBytesRead;
9893
}
9994

10095
/// <summary>
101-
/// Read message from the stream
96+
/// Read message from the stream, message buffer should be returned to array pool after use
10297
/// </summary>
10398
/// <param name="stream"></param>
99+
/// <param name="cancellationToken"></param>
104100
/// <returns></returns>
105101
/// <exception cref="InvalidMessageSizeException"></exception>
106-
public static byte[]? ReadMessage(this Stream stream)
102+
public static (byte[] buffer, int length)? ReadMessage(this Stream stream, CancellationToken cancellationToken)
107103
{
108104
if (stream == null)
109105
return null;
110106

111-
// Read message size (4 bytes)
112-
var sizeBuffer = new byte[4];
113-
var bytesRead = stream.ReadExact(sizeBuffer, 4);
114-
if (bytesRead == 0 || bytesRead < 4)
115-
return null; // Connection closed
116-
117-
var dataSize = BitConverter.ToInt32(sizeBuffer, 0);
118-
int minSize = 8;
119-
int maxSize = 1024 * 1024; // 1 MB
120-
if (dataSize < minSize || dataSize > maxSize)
107+
byte[] sizeBuffer = ArrayPool<byte>.Shared.Rent(4);
108+
try
121109
{
122-
throw new InvalidMessageSizeException()
110+
Span<byte> sizeSpan = sizeBuffer.AsSpan(0, 4);
111+
int bytesRead = stream.ReadExact(sizeSpan, 4, cancellationToken);
112+
if (bytesRead < 4)
113+
return null; // Connection closed
114+
115+
int dataSize = BitConverter.ToInt32(sizeBuffer, 0);
116+
const int minSize = 8;
117+
const int maxSize = 1024 * 1024; // 1 MB limit
118+
if (dataSize < minSize || dataSize > maxSize)
119+
throw new InvalidMessageSizeException() { Size = dataSize, MinSize = minSize, MaxSize = maxSize };
120+
121+
// Rent buffer for the full message
122+
byte[] dataBuffer = ArrayPool<byte>.Shared.Rent(dataSize);
123+
Span<byte> dataSpan = dataBuffer.AsSpan(0, dataSize);
124+
125+
// Copy size header into data buffer
126+
sizeSpan.CopyTo(dataSpan);
127+
128+
// Read the rest of the message body directly into the span
129+
int remainingBytes = dataSize - 4;
130+
bytesRead = stream.ReadExact(dataSpan.Slice(4, remainingBytes), remainingBytes, cancellationToken);
131+
if (bytesRead != remainingBytes)
123132
{
124-
Size = dataSize,
125-
MinSize = minSize,
126-
MaxSize = maxSize
127-
};
128-
}
129-
130-
// Read the complete message (including the size we already read)
131-
var dataBuffer = new byte[dataSize];
132-
sizeBuffer.CopyTo(dataBuffer, 0);
133-
134-
// Already read 4 bytes for message size, so decrease by 4
135-
var remainingBytes = dataSize - 4;
136-
// Read next bytes by remaining bytes, skip 4 bytes (message size which already copied above)
137-
bytesRead = stream.ReadExact(dataBuffer, remainingBytes, 4);
138-
if (bytesRead != remainingBytes)
139-
return null; // Connection closed
133+
ArrayPool<byte>.Shared.Return(dataBuffer);
134+
return null; // Connection closed
135+
}
140136

141-
return dataBuffer;
137+
return (dataBuffer, dataSize);
138+
}
139+
finally
140+
{
141+
ArrayPool<byte>.Shared.Return(sizeBuffer);
142+
}
142143
}
143144

144145
public static async UniTask WriteMessageAsync<T>(this Stream stream, T message, CancellationToken cancellationToken)

SimpleNetworkManager.NET/Network/TcpTransport/TcpClientConnection.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using Insthync.SimpleNetworkManager.NET.Messages.Error;
44
using Microsoft.Extensions.Logging;
55
using System;
6+
using System.Buffers;
67
using System.IO;
78
using System.Net.Sockets;
89
using System.Threading;
@@ -97,7 +98,8 @@ public async UniTask HandleConnectionAsync(CancellationToken cancellationToken)
9798
_logger.LogDebug("Client {ConnectionId} disconnected during message receive", ConnectionId);
9899
break; // Connection closed
99100
}
100-
OnMessageReceived(message);
101+
OnMessageReceived(message.Value.buffer, message.Value.length);
102+
ArrayPool<byte>.Shared.Return(message.Value.buffer);
101103
}
102104
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
103105
{
@@ -138,8 +140,9 @@ public async UniTask HandleConnectionAsync(CancellationToken cancellationToken)
138140

139141
/// <summary>
140142
/// Receives a complete message from the network stream with timeout handling
143+
/// message buffer should be returned to array pool after use
141144
/// </summary>
142-
private async UniTask<byte[]?> ReceiveMessageAsync(CancellationToken cancellationToken)
145+
private async UniTask<(byte[] buffer, int length)?> ReceiveMessageAsync(CancellationToken cancellationToken)
143146
{
144147
if (_networkStream == null || !_isConnected)
145148
return null;

SimpleNetworkManager.NET/Services/MessageRouterService.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,18 @@ public void RegisterHandler<T>(BaseMessageHandler<T> handler, bool dismissWarnin
5959
/// Routes a message to the appropriate handler
6060
/// </summary>
6161
/// <param name="clientConnection">Client connection that sent the message</param>
62-
/// <param name="message">Message to route</param>
62+
/// <param name="buffer">Message buffer</param>
63+
/// <param name="length">Length of buffer</param>
6364
/// <returns>Task representing the async routing operation</returns>
6465
/// <exception cref="ArgumentNullException">Thrown when client or message is null</exception>
65-
public async UniTask RouteMessageAsync(BaseClientConnection clientConnection, byte[] message)
66+
public async UniTask RouteMessageAsync(BaseClientConnection clientConnection, byte[] buffer, int length)
6667
{
6768
if (clientConnection == null)
6869
throw new ArgumentNullException(nameof(clientConnection));
69-
if (message == null)
70-
throw new ArgumentNullException(nameof(message));
70+
if (buffer == null)
71+
throw new ArgumentNullException(nameof(buffer));
7172

72-
var data = BaseMessage.ExtractMessageData(message, out var messageType);
73+
var data = BaseMessage.ExtractMessageData(buffer, length, out var messageType);
7374
if (_handlers.TryGetValue(messageType, out var handler))
7475
{
7576
var messageInstance = handler.GetMessageInstance();

0 commit comments

Comments
 (0)