Skip to content

Commit cabcfb2

Browse files
author
Oren (electricessence)
committed
Updated to 2.0
1 parent 26b945d commit cabcfb2

13 files changed

Lines changed: 832 additions & 266 deletions

Extensions.ActionBlock.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace System.Threading.Tasks.Dataflow
2+
{
3+
public static class ActionBlock
4+
{
5+
public static ActionBlock<T> New<T>(Action<T> action)
6+
=> new ActionBlock<T>(action);
7+
8+
public static ActionBlock<T> New<T>(Action<T> action, ExecutionDataflowBlockOptions options)
9+
=> new ActionBlock<T>(action, options);
10+
11+
public static ActionBlock<T> New<T>(Action<T> consumer, int maxParallel)
12+
=> new ActionBlock<T>(consumer, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = maxParallel });
13+
14+
public static ActionBlock<T> NewAsync<T>(Func<T, Task> action)
15+
=> new ActionBlock<T>(action);
16+
17+
public static ActionBlock<T> NewAsync<T>(Func<T, Task> action, ExecutionDataflowBlockOptions options)
18+
=> new ActionBlock<T>(action, options);
19+
20+
public static ActionBlock<T> NewAsync<T>(Func<T, Task> consumer, int maxParallel)
21+
=> new ActionBlock<T>(consumer, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = maxParallel });
22+
}
23+
}

Extensions.LinkTo.cs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using Open.Threading.Tasks;
2+
using System;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using System.Threading.Tasks.Dataflow;
6+
7+
namespace Open.Dataflow
8+
{
9+
public static partial class DataFlowExtensions
10+
{
11+
public static IDisposable LinkTo<T>(this ISourceBlock<T> producer,
12+
Action<T> consumer)
13+
=> producer.LinkTo(new ActionBlock<T>(consumer));
14+
15+
public static IDisposable LinkToAsync<T>(this ISourceBlock<T> producer,
16+
Func<T, Task> consumer)
17+
=> producer.LinkTo(new ActionBlock<T>(consumer));
18+
19+
public static IDisposable LinkToWithCompletion<T>(this ISourceBlock<T> producer,
20+
ITargetBlock<T> consumer)
21+
=> producer.LinkTo(consumer, new DataflowLinkOptions() { PropagateCompletion = true });
22+
23+
24+
public static T PropagateFaultsTo<T>(this T source, params IDataflowBlock[] targets)
25+
where T : IDataflowBlock
26+
{
27+
source.Completion.OnFaulted(ex =>
28+
{
29+
foreach (var target in targets.Where(t => t != null))
30+
target.Fault(ex.InnerException);
31+
});
32+
return source;
33+
}
34+
35+
public static T PropagateCompletionTo<T>(this T source, params IDataflowBlock[] targets)
36+
where T : IDataflowBlock
37+
{
38+
source.Completion.ContinueWith(task =>
39+
{
40+
foreach (var target in targets.Where(t => t != null))
41+
{
42+
if (task.IsFaulted)
43+
// ReSharper disable once PossibleNullReferenceException
44+
target.Fault(task.Exception.InnerException);
45+
else
46+
target.Complete();
47+
}
48+
});
49+
return source;
50+
}
51+
52+
}
53+
}

Extensions.Observable.cs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
using System;
2+
3+
namespace Open.Dataflow
4+
{
5+
public static partial class DataFlowExtensions
6+
{
7+
class Observer<T> : IObserver<T>, IDisposable
8+
{
9+
public static IObserver<T> New(
10+
Action<T> onNext,
11+
Action<Exception> onError,
12+
Action onCompleted)
13+
=> new Observer<T>()
14+
{
15+
_onNext = onNext,
16+
_onError = onError,
17+
_onCompleted = onCompleted
18+
};
19+
20+
Action _onCompleted;
21+
Action<Exception> _onError;
22+
Action<T> _onNext;
23+
24+
25+
public void OnNext(T value)
26+
{
27+
_onNext?.Invoke(value);
28+
}
29+
30+
public void OnError(Exception error)
31+
{
32+
_onError?.Invoke(error);
33+
}
34+
35+
public void OnCompleted()
36+
{
37+
_onCompleted?.Invoke();
38+
}
39+
40+
41+
public void Dispose()
42+
{
43+
_onNext = null;
44+
_onError = null;
45+
_onCompleted = null;
46+
}
47+
}
48+
49+
public static IDisposable Subscribe<T>(this IObservable<T> observable,
50+
Action<T> onNext,
51+
Action<Exception> onError,
52+
Action onCompleted = null)
53+
=> observable.Subscribe(Observer<T>.New(onNext, onError, onCompleted));
54+
55+
public static IDisposable Subscribe<T>(this IObservable<T> observable,
56+
Action<T> onNext,
57+
Action onCompleted = null)
58+
=> observable.Subscribe(Observer<T>.New(onNext, null, onCompleted));
59+
60+
}
61+
}

0 commit comments

Comments
 (0)