Skip to content

Commit cd3e2c9

Browse files
committed
📝 Update usage sample and docs
1 parent 115a97a commit cd3e2c9

4 files changed

Lines changed: 37 additions & 15 deletions

File tree

readme.md

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,37 @@ await client.ConnectAsync(serverUri, CancellationToken.None);
1818

1919
using IWebSocketPipe pipe = WebSocketPipe.Create(client, closeWhenCompleted: true);
2020

21-
var read = Task.Run(async () =>
22-
{
23-
var read = await pipe.Input.ReadAsync();
24-
Output.WriteLine("Client: " + Encoding.UTF8.GetString(read.Buffer));
25-
await pipe.CompleteAsync(WebSocketCloseStatus.NormalClosure, "Client Done");
26-
});
27-
21+
// Start the pipe before hooking up the processing
2822
var run = pipe.RunAsync();
2923

30-
await pipe.Output.WriteAsync(Encoding.UTF8.GetBytes("hello").AsMemory());
31-
24+
// Wait for completion of processing code
25+
await Task.WhenAny(
26+
ReadIncoming(pipe.Input),
27+
SendOutgoing(pipe.Output));
3228

29+
// When the processing completes, the overall pipe run will also complete
30+
await run;
3331

32+
// Reads incoming data and writes to the console
33+
async Task ReadIncoming(PipeReader reader)
34+
{
35+
while (await reader.ReadAsync() is var result && !result.IsCompleted)
36+
{
37+
Console.WriteLine($"Received: {Encoding.UTF8.GetString(result.Buffer)}");
38+
reader.AdvanceTo(result.Buffer.Start, result.Buffer.End);
39+
}
40+
Console.WriteLine($"Done reading.");
41+
}
42+
43+
// Reads console input and writes to pipe until an empty line is entered
44+
async Task SendOutgoing(PipeWriter writer)
45+
{
46+
while (Console.ReadLine() is var line && line?.Length > 0)
47+
{
48+
Encoding.UTF8.GetBytes(line, writer);
49+
}
50+
await writer.CompleteAsync();
51+
Console.WriteLine($"Done writing.");
52+
}
3453
```
3554

src/Tests/EndToEnd.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ public async Task RunAsync()
2828

2929
var run = pipe.RunAsync();
3030

31-
await pipe.Output.WriteAsync(Encoding.UTF8.GetBytes("hello").AsMemory());
31+
pipe.Output.Advance(Encoding.UTF8.GetBytes("hello", pipe.Output.GetSpan()));
32+
await pipe.Output.FlushAsync();
33+
34+
//await pipe.Output.WriteAsync(Encoding.UTF8.GetBytes("hello").AsMemory());
3235

3336
await read;
3437
await run;
@@ -44,7 +47,7 @@ async Task Echo(IDuplexPipe pipe)
4447
Output.WriteLine($"Echoing: {Encoding.UTF8.GetString(result.Buffer)}");
4548
// Just assume we get a single-segment entry, for simplicity
4649
await pipe.Output.WriteAsync(result.Buffer.First);
47-
pipe.Input.AdvanceTo(result.Buffer.Start, result.Buffer.End);
50+
pipe.Input.AdvanceTo(result.Buffer.End);
4851
}
4952
Output.WriteLine($"Server: Done.");
5053
}

src/Tests/SimpleWebSocketPipeTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public async Task WhenReceivingChunks_PipeReaderExposesFullMessage()
176176
for (var i = 0; i < result.Buffer.Length; i++)
177177
await s.SendAsync(result.Buffer.Slice(i, 1).First, WebSocketMessageType.Binary, i == result.Buffer.Length - 1, default);
178178

179-
serverPipe.Input.AdvanceTo(result.Buffer.Start, result.Buffer.End);
179+
serverPipe.Input.AdvanceTo(result.Buffer.End);
180180
}
181181
await serverRun;
182182
}, null, Output);
@@ -198,7 +198,7 @@ async Task Echo(IWebSocketPipe pipe)
198198
while (await pipe.Input.ReadAsync() is var result && !result.IsCompleted)
199199
{
200200
await pipe.Output.WriteAsync(result.Buffer.First);
201-
pipe.Input.AdvanceTo(result.Buffer.Start, result.Buffer.End);
201+
pipe.Input.AdvanceTo(result.Buffer.End);
202202
}
203203
}
204204
}

src/WebSocketPipe/SimpleWebSocketPipe.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public SimpleWebSocketPipe(WebSocket webSocket, WebSocketPipeOptions options)
4444
public async Task RunAsync(CancellationToken cancellation = default)
4545
{
4646
if (webSocket.State != WebSocketState.Open)
47-
throw new InvalidOperationException();
47+
throw new InvalidOperationException($"WebSocket must be opened. State was {webSocket.State}");
4848

4949
var writing = FillInputAsync(cancellation);
5050
var reading = SendOutputAsync(cancellation);
@@ -162,7 +162,7 @@ async Task SendOutputAsync(CancellationToken cancellation)
162162
}
163163
}
164164

165-
outputPipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End);
165+
outputPipe.Reader.AdvanceTo(result.Buffer.End);
166166

167167
}
168168
catch (Exception ex) when (ex is OperationCanceledException ||

0 commit comments

Comments
 (0)