@@ -75,6 +75,72 @@ public static async UniTask<int> ReadExactAsync(this Stream stream, byte[] buffe
7575 return dataBuffer ;
7676 }
7777
78+ /// <summary>
79+ /// Reads exactly the specified number of bytes from the stream
80+ /// </summary>
81+ public static int ReadExact ( this Stream stream , byte [ ] buffer , int count , int offset = 0 )
82+ {
83+ if ( stream == null )
84+ return 0 ;
85+
86+ int totalBytesRead = 0 ;
87+ while ( totalBytesRead < count )
88+ {
89+ var bytesRead = stream . Read (
90+ buffer , offset + totalBytesRead , count - totalBytesRead ) ;
91+
92+ if ( bytesRead == 0 )
93+ break ; // Connection closed
94+
95+ totalBytesRead += bytesRead ;
96+ }
97+ return totalBytesRead ;
98+ }
99+
100+ /// <summary>
101+ /// Read message from the stream
102+ /// </summary>
103+ /// <param name="stream"></param>
104+ /// <returns></returns>
105+ /// <exception cref="InvalidMessageSizeException"></exception>
106+ public static byte [ ] ? ReadMessage ( this Stream stream )
107+ {
108+ if ( stream == null )
109+ return null ;
110+
111+ // Read message size (4 bytes)
112+ var sizeBuffer = new byte [ 4 ] ;
113+ var bytesRead = stream . ReadExact ( sizeBuffer , 4 ) ;
114+ if ( bytesRead == 0 || bytesRead < 4 )
115+ return null ; // Connection closed
116+
117+ var dataSize = BitConverter . ToInt32 ( sizeBuffer , 0 ) ;
118+ int minSize = 8 ;
119+ int maxSize = 1024 * 1024 ; // 1 MB
120+ if ( dataSize < minSize || dataSize > maxSize )
121+ {
122+ throw new InvalidMessageSizeException ( )
123+ {
124+ Size = dataSize ,
125+ MinSize = minSize ,
126+ MaxSize = maxSize
127+ } ;
128+ }
129+
130+ // Read the complete message (including the size we already read)
131+ var dataBuffer = new byte [ dataSize ] ;
132+ sizeBuffer . CopyTo ( dataBuffer , 0 ) ;
133+
134+ // Already read 4 bytes for message size, so decrease by 4
135+ var remainingBytes = dataSize - 4 ;
136+ // Read next bytes by remaining bytes, skip 4 bytes (message size which already copied above)
137+ bytesRead = stream . ReadExact ( dataBuffer , remainingBytes , 4 ) ;
138+ if ( bytesRead != remainingBytes )
139+ return null ; // Connection closed
140+
141+ return dataBuffer ;
142+ }
143+
78144 public static async UniTask WriteMessageAsync < T > ( this Stream stream , T message , CancellationToken cancellationToken )
79145 where T : BaseMessage
80146 {
@@ -99,5 +165,30 @@ public static async UniTask WriteMessageAsync<T>(this Stream stream, T message,
99165 await stream . WriteAsync ( serializedData , cancellationToken ) ;
100166 await stream . FlushAsync ( cancellationToken ) ;
101167 }
168+
169+ public static void WriteMessage < T > ( this Stream stream , T message )
170+ where T : BaseMessage
171+ {
172+ if ( stream == null )
173+ return ;
174+
175+ uint messageType = message . GetMessageType ( ) ;
176+ byte [ ] serializedData ;
177+ try
178+ {
179+ serializedData = message . Serialize ( ) ;
180+ }
181+ catch ( MessagePack . MessagePackSerializationException )
182+ {
183+ throw ;
184+ }
185+ catch ( Exception )
186+ {
187+ throw ;
188+ }
189+
190+ stream . Write ( serializedData ) ;
191+ stream . Flush ( ) ;
192+ }
102193 }
103194}
0 commit comments