我对Framework 4.0的ActionBlock实现感兴趣,因为Framework 4.0似乎不支持TPL.Dataflow。更特别地,我对构造函数的情况感兴趣,该构造函数接收Func<TInput,任务>delegate和MaxDegreeOfParallism=1的情况。
我曾想过使用反应式扩展来实现它,但我不确定如何实现;TInput>在Post上调用OnNext,并使用SelectMany和taskToObservable之类的东西,但我不知道该如何处理调度器。这是我所想的一份草稿。
public class ActionBlock<TInput>
{
private readonly TaskCompletionSource<object> mCompletion = new TaskCompletionSource<object>();
private readonly Subject<TInput> mQueue = new Subject<TInput>();
public ActionBlock(Func<TInput, Task> action)
{
var observable =
from item in mQueue
from _ in action(item).ToObservable()
select _;
observable.Subscribe(x => { },
OnComplete);
}
private void OnComplete()
{
mCompletion.SetResult(null);
}
public void Post(TInput input)
{
mQueue.OnNext(input);
}
public Task Completion
{
get
{
return mCompletion.Task;
}
}
public void Complete()
{
mQueue.OnCompleted();
}
}
我想也许可以使用EventLoopScheduler,但我不确定它是否适合这里,因为这是异步的。
有什么想法吗?
mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Merge(maxDegreeOfParallelism)
.Subscribe(...);
如果确实maxDegreeOfParallelism
总是1,那么只使用Concat
而不是Merge
:
mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Concat()
.Subscribe(...);
这是因为FromAsync
只是创建了一个冷的可观察对象,该对象在订阅之前不会运行异步操作。然后,我们使用Merge
的maxConcurrency
参数(或仅使用Concat
)来限制并发订阅的数量(从而限制正在运行的异步操作的数量)。
编辑:
由于您的目标是只拥有一个代表流完成的Task
,因此您可以使用ToTask
而不是直接订阅。CCD_ 10将订阅并返回具有最终值的CCD_。因为如果可观测值没有产生值,ToTask
就会抛出,所以我们将使用Count
来保证它产生值:
// task to mark completion
private readonly Task mCompletion;
// ...
this.mCompletion = mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Concat()
.Count()
.ToTask();