Skip to content

Commit 150c616

Browse files
Fix CPU spin in BatchingChannelReader when source completes (#53)
* Initial plan * Add repro test for batching hang issue Co-authored-by: electricessence <5899455+electricessence@users.noreply.github.com> * Fix CPU hang in BatchingChannelReader.WaitToReadAsyncCore The issue was that after Task.WhenAny completed, the code did not await the source task before looping. This caused a tight CPU-consuming loop when the source channel was closed but the buffer was empty, as the closed source would immediately return a completed ValueTask on each WaitToReadAsync call. The fix properly awaits the source task after WhenAny to check its completion status, preventing the tight loop. This matches the pattern used in the base BufferingChannelReader class. Co-authored-by: electricessence <5899455+electricessence@users.noreply.github.com> * Address code review feedback - Remove unused System.Text.Json import from test - Use consistent attribute style in test - Add clarifying comment explaining the buffer completion check Co-authored-by: electricessence <5899455+electricessence@users.noreply.github.com> * Bump version to 9.1.2 Updated Open.ChannelExtensions.csproj to increase the project version from 9.1.1 to 9.1.2. No other modifications were made. --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: electricessence <5899455+electricessence@users.noreply.github.com>
1 parent 9d1aec2 commit 150c616

3 files changed

Lines changed: 69 additions & 2 deletions

File tree

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using System.Runtime.CompilerServices;
2+
3+
namespace Open.ChannelExtensions.Tests;
4+
5+
public static class HangReproTest
6+
{
7+
/// <summary>
8+
/// Reproduction test for the hanging issue described in GitHub.
9+
/// This test should fail if the bug exists (by timing out).
10+
/// </summary>
11+
[Fact]
12+
public static async Task MultipleBatchReadersDoNotHang()
13+
{
14+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); // 30 second timeout
15+
var token = cts.Token;
16+
17+
var counts = new int[5]; // Using 5 tasks instead of 10 for faster testing
18+
var tasks = Enumerable.Range(0, 5)
19+
.Select(x => Task.Run(async () =>
20+
{
21+
for (int i = 0; i < 50; i++) // Run 50 iterations instead of infinite
22+
{
23+
if (token.IsCancellationRequested)
24+
break;
25+
26+
await GetSource()
27+
.ToChannel(singleReader: true, cancellationToken: token)
28+
.Batch(Random.Shared.Next(15, 30), singleReader: true)
29+
.ReadAllAsync(async batch =>
30+
{
31+
await Task.Delay(Random.Shared.Next(2, 15), token);
32+
}, token);
33+
34+
counts[x] += 1;
35+
}
36+
}, token)).ToArray();
37+
38+
// Wait for all tasks to complete or timeout
39+
await Task.WhenAll(tasks);
40+
41+
// Verify all tasks completed at least some iterations
42+
Assert.All(counts, count => Assert.True(count >= 40, $"Count was {count}, expected at least 40"));
43+
}
44+
45+
private static async IAsyncEnumerable<int> GetSource([EnumeratorCancellation] CancellationToken cancellationToken = default)
46+
{
47+
foreach (var value in Enumerable.Range(0, Random.Shared.Next(80, 120)))
48+
{
49+
if (cancellationToken.IsCancellationRequested)
50+
yield break;
51+
52+
yield return value;
53+
54+
if (value % Random.Shared.Next(15, 25) == 0)
55+
await Task.Delay(Random.Shared.Next(2, 15), cancellationToken);
56+
}
57+
}
58+
}

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<RepositoryType>git</RepositoryType>
2323
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2424
<GenerateDocumentationFile>true</GenerateDocumentationFile>
25-
<Version>9.1.1</Version>
25+
<Version>9.1.2</Version>
2626
<PackageReleaseNotes>Ensure the current task scheduler is used and that it can be configured.</PackageReleaseNotes>
2727
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2828
<PublishRepositoryUrl>true</PublishRepositoryUrl>

Open.ChannelExtensions/Readers/BatchingChannelReader.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,22 @@ protected override async ValueTask<bool> WaitToReadAsyncCore(
245245
}
246246

247247
// WhenAny will not throw when a task is cancelled.
248-
await Task.WhenAny(s.AsTask(), b).ConfigureAwait(false);
248+
Task<bool> sTask = s.AsTask();
249+
await Task.WhenAny(sTask, b).ConfigureAwait(false);
249250
if (b.IsCompleted) // Assuming it was bufferWait that completed.
250251
{
251252
await tokenSource.CancelAsync().ConfigureAwait(false);
252253
return await b.ConfigureAwait(false);
253254
}
254255

256+
// Await the source task to check if it completed and get its result
257+
// This prevents a tight loop when the source is closed
258+
await sTask.ConfigureAwait(false);
259+
// Check if buffer completed while we were waiting (e.g., timeout forced a batch)
260+
// If so, return immediately without trying to pipe more items
261+
if (b.IsCompleted)
262+
return await b.ConfigureAwait(false);
263+
255264
TryPipeItems(false);
256265

257266
if (b.IsCompleted || token.IsCancellationRequested)

0 commit comments

Comments
 (0)