Skip to content

Commit bc97216

Browse files
author
Oren (electricessence)
committed
Extended cancellation capability.
1 parent fd3b359 commit bc97216

2 files changed

Lines changed: 32 additions & 15 deletions

File tree

Extensions._.cs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public static ISourceBlock<T[]> Batch<T>(this ISourceBlock<T> source,
5454
var batchBlock = dataflowBlockOptions == null
5555
? new BatchBlock<T>(batchSize)
5656
: new BatchBlock<T>(batchSize, dataflowBlockOptions);
57+
5758
source.LinkToWithCompletion(batchBlock);
5859
return batchBlock;
5960
}
@@ -99,16 +100,20 @@ async Task<int> ToTargetBlockAsyncCore()
99100
foreach (var entry in source)
100101
{
101102
if (cancellationToken.IsCancellationRequested
102-
|| !target.Post(entry) && !await target.SendAsync(entry))
103+
|| !target.Post(entry) && !await target.SendAsync(entry, cancellationToken))
103104
break;
104105

105106
count++;
106107
}
108+
109+
cancellationToken.ThrowIfCancellationRequested();
110+
107111
return count;
108112
}
109113
}
110114

111-
public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source)
115+
public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source,
116+
CancellationToken cancellationToken = default)
112117
{
113118
if (source == null)
114119
throw new NullReferenceException();
@@ -117,12 +122,16 @@ public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T>
117122
var result = new List<T>();
118123
do
119124
{
120-
while (source.TryReceive(null, out var e))
125+
while (!cancellationToken.IsCancellationRequested
126+
&& source.TryReceive(null, out var e))
121127
{
122128
result.Add(e);
123129
}
124130
}
125-
while (await source.OutputAvailableAsync());
131+
while (!cancellationToken.IsCancellationRequested
132+
&& await source.OutputAvailableAsync(cancellationToken));
133+
134+
cancellationToken.ThrowIfCancellationRequested();
126135

127136
return result;
128137
}
@@ -163,23 +172,27 @@ async Task CallCompleteWhenFinished()
163172

164173
public static async Task<int> AllLinesTo(this TextReader source,
165174
ITargetBlock<string> target,
166-
bool completeAndWait = false)
175+
bool completeAndAwait = false,
176+
CancellationToken cancellationToken = default)
167177
{
168178
if (source == null)
169179
throw new NullReferenceException();
170180
Contract.EndContractBlock();
171181

172182
var count = 0;
173183
string line;
174-
while ((line = await source.ReadLineAsync()) != null)
184+
while (!cancellationToken.IsCancellationRequested
185+
&& (line = await source.ReadLineAsync()) != null)
175186
{
176-
if (!target.Post(line) && !await target.SendAsync(line))
187+
if (!target.Post(line) && !await target.SendAsync(line, cancellationToken))
177188
break;
178189

179190
count++;
180191
}
181192

182-
if (completeAndWait)
193+
cancellationToken.ThrowIfCancellationRequested();
194+
195+
if (completeAndAwait)
183196
await target.CompleteAsync();
184197

185198
return count;
@@ -188,24 +201,28 @@ public static async Task<int> AllLinesTo(this TextReader source,
188201
public static async Task<int> AllLinesTo<T>(this TextReader source,
189202
ITargetBlock<T> target,
190203
Func<string, T> transform,
191-
bool completeAndWait = false)
204+
bool completeAndAwait = false,
205+
CancellationToken cancellationToken = default)
192206
{
193207
if (source == null)
194208
throw new NullReferenceException();
195209
Contract.EndContractBlock();
196210

197211
var count = 0;
198212
string line;
199-
while ((line = await source.ReadLineAsync()) != null)
213+
while (!cancellationToken.IsCancellationRequested
214+
&& (line = await source.ReadLineAsync()) != null)
200215
{
201216
var e = transform(line);
202-
if (!target.Post(e) && !await target.SendAsync(e))
217+
if (!target.Post(e) && !await target.SendAsync(e, cancellationToken))
203218
break;
204219

205220
count++;
206221
}
207222

208-
if (completeAndWait)
223+
cancellationToken.ThrowIfCancellationRequested();
224+
225+
if (completeAndAwait)
209226
await target.CompleteAsync();
210227

211228
return count;

Open.Threading.Dataflow.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ Part of the "Open" set of libraries.</Description>
1515
<RepositoryUrl>https://github.com/electricessence/Open.Threading.Dataflow/</RepositoryUrl>
1616
<RepositoryType>git</RepositoryType>
1717
<PackageTags>dotnet, dotnet-core, dotnetcore, cs, dataflow, tpl, extensions</PackageTags>
18-
<Version>2.1.1</Version>
18+
<Version>2.1.2</Version>
1919
<PackageReleaseNotes></PackageReleaseNotes>
20-
<AssemblyVersion>2.1.1.0</AssemblyVersion>
21-
<FileVersion>2.1.1.0</FileVersion>
20+
<AssemblyVersion>2.1.2.0</AssemblyVersion>
21+
<FileVersion>2.1.2.0</FileVersion>
2222
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2323
</PropertyGroup>
2424

0 commit comments

Comments
 (0)