Skip to content

Commit fd3b359

Browse files
author
Oren (electricessence)
committed
Updates.
1 parent cabcfb2 commit fd3b359

12 files changed

Lines changed: 64 additions & 33 deletions

Extensions.ActionBlock.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
namespace System.Threading.Tasks.Dataflow
1+
using System;
2+
using System.Threading.Tasks;
3+
using System.Threading.Tasks.Dataflow;
4+
5+
namespace Open.Threading.Dataflow
26
{
37
public static class ActionBlock
48
{

Extensions.LinkTo.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
using System.Threading.Tasks;
55
using System.Threading.Tasks.Dataflow;
66

7-
namespace Open.Dataflow
7+
namespace Open.Threading.Dataflow
88
{
99
public static partial class DataFlowExtensions
1010
{

Extensions.Observable.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
using System;
22

3-
namespace Open.Dataflow
3+
namespace Open.Threading.Dataflow
44
{
55
public static partial class DataFlowExtensions
66
{

Extensions.Pipe.cs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
using System.Threading.Tasks.Dataflow;
66

77

8-
namespace Open.Dataflow
8+
namespace Open.Threading.Dataflow
99
{
1010
public static partial class DataFlowExtensions
1111
{
@@ -74,6 +74,7 @@ public static ActionBlock<T> Pipe<T>(this ISourceBlock<T> source,
7474
var receiver = options == null
7575
? new ActionBlock<T>(handler)
7676
: new ActionBlock<T>(handler, options);
77+
7778
source.LinkToWithCompletion(receiver);
7879
return receiver;
7980
}
@@ -113,7 +114,7 @@ public static ActionBlock<T> PipeAsync<T>(this ISourceBlock<T> source,
113114
/// <param name="handler">The handler function to apply.</param>
114115
/// <param name="cancellationToken">An optional cancellation token.</param>
115116
/// <returns>The ActionBlock created.</returns>
116-
public static ActionBlock<T> Parallel<T>(this ISourceBlock<T> source,
117+
public static ActionBlock<T> PipeConcurrently<T>(this ISourceBlock<T> source,
117118
int maxConcurrency,
118119
Action<T> handler,
119120
CancellationToken cancellationToken = default)
@@ -133,7 +134,7 @@ public static ActionBlock<T> Parallel<T>(this ISourceBlock<T> source,
133134
/// <param name="handler">The async handler function to apply.</param>
134135
/// <param name="cancellationToken">An optional cancellation token.</param>
135136
/// <returns>The ActionBlock created.</returns>
136-
public static ActionBlock<T> ParallelAsync<T>(this ISourceBlock<T> source,
137+
public static ActionBlock<T> PipeConcurrentlyAsync<T>(this ISourceBlock<T> source,
137138
int maxConcurrency,
138139
Func<T, Task> handler,
139140
CancellationToken cancellationToken = default)
@@ -154,7 +155,7 @@ public static ActionBlock<T> ParallelAsync<T>(this ISourceBlock<T> source,
154155
/// <param name="transform">The transform function to apply.</param>
155156
/// <param name="cancellationToken">An optional cancellation token.</param>
156157
/// <returns>The TransformBlock created.</returns>
157-
public static IReceivableSourceBlock<TOut> Parallel<TIn, TOut>(this ISourceBlock<TIn> source,
158+
public static IReceivableSourceBlock<TOut> PipeConcurrently<TIn, TOut>(this ISourceBlock<TIn> source,
158159
int maxConcurrency,
159160
Func<TIn, TOut> transform,
160161
CancellationToken cancellationToken = default)
@@ -175,7 +176,7 @@ public static IReceivableSourceBlock<TOut> Parallel<TIn, TOut>(this ISourceBlock
175176
/// <param name="transform">The async transform function to apply.</param>
176177
/// <param name="cancellationToken">An optional cancellation token.</param>
177178
/// <returns>The TransformBlock created.</returns>
178-
public static IReceivableSourceBlock<TOut> ParallelAsync<TIn, TOut>(this ISourceBlock<TIn> source,
179+
public static IReceivableSourceBlock<TOut> PipeConcurrentlyAsync<TIn, TOut>(this ISourceBlock<TIn> source,
179180
int maxConcurrency,
180181
Func<TIn, Task<TOut>> transform,
181182
CancellationToken cancellationToken = default)
@@ -196,7 +197,7 @@ public static IReceivableSourceBlock<TOut> ParallelAsync<TIn, TOut>(this ISource
196197
/// <param name="handler">The handler function to apply.</param>
197198
/// <param name="cancellationToken">An optional cancellation token.</param>
198199
/// <returns>The ActionBlock created.</returns>
199-
public static ActionBlock<T> BoundedParallel<T>(this ISourceBlock<T> source,
200+
public static ActionBlock<T> Pipe<T>(this ISourceBlock<T> source,
200201
int capacity,
201202
int maxConcurrency,
202203
Action<T> handler,
@@ -219,7 +220,7 @@ public static ActionBlock<T> BoundedParallel<T>(this ISourceBlock<T> source,
219220
/// <param name="handler">The async handler function to apply.</param>
220221
/// <param name="cancellationToken">An optional cancellation token.</param>
221222
/// <returns>The ActionBlock created.</returns>
222-
public static ActionBlock<T> BoundedParallelAsync<T>(this ISourceBlock<T> source,
223+
public static ActionBlock<T> PipeAsync<T>(this ISourceBlock<T> source,
223224
int capacity,
224225
int maxConcurrency,
225226
Func<T, Task> handler,
@@ -243,7 +244,7 @@ public static ActionBlock<T> BoundedParallelAsync<T>(this ISourceBlock<T> source
243244
/// <param name="transform">The transform function to apply.</param>
244245
/// <param name="cancellationToken">An optional cancellation token.</param>
245246
/// <returns>The TransformBlock created.</returns>
246-
public static IReceivableSourceBlock<TOut> BoundedParallel<TIn, TOut>(this ISourceBlock<TIn> source,
247+
public static IReceivableSourceBlock<TOut> Pipe<TIn, TOut>(this ISourceBlock<TIn> source,
247248
int capacity,
248249
int maxConcurrency,
249250
Func<TIn, TOut> transform,
@@ -268,7 +269,7 @@ public static IReceivableSourceBlock<TOut> BoundedParallel<TIn, TOut>(this ISour
268269
/// <param name="transform">The async transform function to apply.</param>
269270
/// <param name="cancellationToken">An optional cancellation token.</param>
270271
/// <returns>The TransformBlock created.</returns>
271-
public static IReceivableSourceBlock<TOut> BoundedParallelAsync<TIn, TOut>(this ISourceBlock<TIn> source,
272+
public static IReceivableSourceBlock<TOut> PipeAsync<TIn, TOut>(this ISourceBlock<TIn> source,
272273
int capacity,
273274
int maxConcurrency,
274275
Func<TIn, Task<TOut>> transform,
@@ -290,7 +291,7 @@ public static IReceivableSourceBlock<TOut> BoundedParallelAsync<TIn, TOut>(this
290291
/// <param name="handler">The handler function to apply.</param>
291292
/// <param name="cancellationToken">An optional cancellation token.</param>
292293
/// <returns>The ActionBlock created.</returns>
293-
public static ActionBlock<T> Bounded<T>(this ISourceBlock<T> source,
294+
public static ActionBlock<T> PipeLimited<T>(this ISourceBlock<T> source,
294295
int capacity,
295296
Action<T> handler,
296297
CancellationToken cancellationToken = default)
@@ -310,7 +311,7 @@ public static ActionBlock<T> Bounded<T>(this ISourceBlock<T> source,
310311
/// <param name="handler">The async handler function to apply.</param>
311312
/// <param name="cancellationToken">An optional cancellation token.</param>
312313
/// <returns>The ActionBlock created.</returns>
313-
public static ActionBlock<T> Bounded<T>(this ISourceBlock<T> source,
314+
public static ActionBlock<T> PipeLimitedAsync<T>(this ISourceBlock<T> source,
314315
int capacity,
315316
Func<T, Task> handler,
316317
CancellationToken cancellationToken = default)
@@ -331,7 +332,7 @@ public static ActionBlock<T> Bounded<T>(this ISourceBlock<T> source,
331332
/// <param name="transform">The transform function to apply.</param>
332333
/// <param name="cancellationToken">An optional cancellation token.</param>
333334
/// <returns>The TransformBlock created.</returns>
334-
public static IReceivableSourceBlock<TOut> Bounded<TIn, TOut>(this ISourceBlock<TIn> source,
335+
public static IReceivableSourceBlock<TOut> PipeLimited<TIn, TOut>(this ISourceBlock<TIn> source,
335336
int capacity,
336337
Func<TIn, TOut> transform,
337338
CancellationToken cancellationToken = default)
@@ -352,7 +353,7 @@ public static IReceivableSourceBlock<TOut> Bounded<TIn, TOut>(this ISourceBlock<
352353
/// <param name="transform">The async transform function to apply.</param>
353354
/// <param name="cancellationToken">An optional cancellation token.</param>
354355
/// <returns>The TransformBlock created.</returns>
355-
public static IReceivableSourceBlock<TOut> Bounded<TIn, TOut>(this ISourceBlock<TIn> source,
356+
public static IReceivableSourceBlock<TOut> PipeLimitedAsync<TIn, TOut>(this ISourceBlock<TIn> source,
356357
int capacity,
357358
Func<TIn, Task<TOut>> transform,
358359
CancellationToken cancellationToken = default)

Extensions.TransformBlock.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
namespace System.Threading.Tasks.Dataflow
1+
using System;
2+
using System.Threading.Tasks;
3+
using System.Threading.Tasks.Dataflow;
4+
5+
namespace Open.Threading.Dataflow
26
{
37
public static class TransformBlock
48
{

Extensions._.cs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,16 @@
77
using System.Threading.Tasks;
88
using System.Threading.Tasks.Dataflow;
99

10-
namespace Open.Dataflow
10+
namespace Open.Threading.Dataflow
1111
{
1212
public static partial class DataFlowExtensions
1313
{
14+
public static Task CompleteAsync(this IDataflowBlock source)
15+
{
16+
source.Complete();
17+
return source.Completion;
18+
}
19+
1420
public static ISourceBlock<T> Buffer<T>(this ISourceBlock<T> source, DataflowBlockOptions dataflowBlockOptions = null)
1521
{
1622
if (source == null)
@@ -105,7 +111,7 @@ async Task<int> ToTargetBlockAsyncCore()
105111
public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source)
106112
{
107113
if (source == null)
108-
throw new NullReferenceException();
114+
throw new NullReferenceException();
109115
Contract.EndContractBlock();
110116

111117
var result = new List<T>();
@@ -156,7 +162,8 @@ async Task CallCompleteWhenFinished()
156162
}
157163

158164
public static async Task<int> AllLinesTo(this TextReader source,
159-
ITargetBlock<string> target)
165+
ITargetBlock<string> target,
166+
bool completeAndWait = false)
160167
{
161168
if (source == null)
162169
throw new NullReferenceException();
@@ -171,12 +178,17 @@ public static async Task<int> AllLinesTo(this TextReader source,
171178

172179
count++;
173180
}
181+
182+
if (completeAndWait)
183+
await target.CompleteAsync();
184+
174185
return count;
175-
}
186+
}
176187

177188
public static async Task<int> AllLinesTo<T>(this TextReader source,
178189
ITargetBlock<T> target,
179-
Func<string, T> transform)
190+
Func<string, T> transform,
191+
bool completeAndWait = false)
180192
{
181193
if (source == null)
182194
throw new NullReferenceException();
@@ -193,6 +205,9 @@ public static async Task<int> AllLinesTo<T>(this TextReader source,
193205
count++;
194206
}
195207

208+
if (completeAndWait)
209+
await target.CompleteAsync();
210+
196211
return count;
197212
}
198213

@@ -201,7 +216,7 @@ public static T OnComplete<T>(this T source, Action oncomplete)
201216
{
202217
source.Completion.ContinueWith(task => oncomplete());
203218
return source;
204-
}
219+
}
205220

206221
public static T OnComplete<T>(this T source, Action<Task> oncomplete)
207222
where T : IDataflowBlock
@@ -210,10 +225,17 @@ public static T OnComplete<T>(this T source, Action<Task> oncomplete)
210225
return source;
211226
}
212227

228+
public static T OnCompletedSuccessfully<T>(this T source, Action oncomplete)
229+
where T : IDataflowBlock
230+
{
231+
source.Completion.OnFullfilled(oncomplete);
232+
return source;
233+
}
234+
213235
public static T OnFault<T>(this T source, Action<Exception> onfault)
214236
where T : IDataflowBlock
215237
{
216-
source.Completion.OnFaulted(task => onfault(task.InnerException));
238+
source.Completion.OnFaulted(ex => onfault(ex.InnerException));
217239
return source;
218240
}
219241

@@ -225,3 +247,4 @@ public static void Fault(this IDataflowBlock target, string message, Exception i
225247

226248
}
227249
}
250+

Filters/AcceptOrPass.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using System.Threading.Tasks;
33
using System.Threading.Tasks.Dataflow;
44

5-
namespace Open.Dataflow
5+
namespace Open.Threading.Dataflow
66
{
77
internal sealed class AcceptOrPassBlock<T> : ITargetBlock<T>
88
{

Filters/AutoComplete.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
using Open.Threading;
22
using System.Threading.Tasks.Dataflow;
33

4-
namespace Open.Dataflow
4+
namespace Open.Threading.Dataflow
55
{
66

77
internal class AutoCompleteFilter<T> : TargetBlockFilter<T>

Filters/Changed.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
using Open.Threading;
22
using System.Threading.Tasks.Dataflow;
33

4-
namespace Open.Dataflow
4+
namespace Open.Threading.Dataflow
55
{
66
internal class ChangedFilter<T> : TargetBlockFilter<T>
77
{

Filters/Distinct.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
using System.Collections.Generic;
22
using System.Threading.Tasks.Dataflow;
33

4-
namespace Open.Dataflow
4+
namespace Open.Threading.Dataflow
55
{
66
internal class DistinctFilter<T> : TargetBlockFilter<T>
77
{

0 commit comments

Comments
 (0)