ActionBlock Framework 4 rx替代方案



我对Framework 4.0的ActionBlock实现感兴趣,因为Framework 4.0似乎不支持TPL.Dataflow。更特别地,我对构造函数的情况感兴趣,该构造函数接收Func<TInput,任务>delegate和MaxDegreeOfParallism=1的情况。

我曾想过使用反应式扩展来实现它,但我不确定如何实现;TInput>在Post上调用OnNext,并使用SelectMany和taskToObservable之类的东西,但我不知道该如何处理调度器。这是我所想的一份草稿。

public class ActionBlock<TInput>
{
    private readonly TaskCompletionSource<object> mCompletion = new TaskCompletionSource<object>();
    private readonly Subject<TInput> mQueue = new Subject<TInput>();
    public ActionBlock(Func<TInput, Task> action)
    {
        var observable =
            from item in mQueue
            from _ in action(item).ToObservable()
            select _;
        observable.Subscribe(x => { },
            OnComplete);
    }
    private void OnComplete()
    {
        mCompletion.SetResult(null);
    }
    public void Post(TInput input)
    {
        mQueue.OnNext(input);
    }
    public Task Completion
    {
        get
        {
            return mCompletion.Task;
        }
    }
    public void Complete()
    {
        mQueue.OnCompleted();
    }
}

我想也许可以使用EventLoopScheduler,但我不确定它是否适合这里,因为这是异步的。

有什么想法吗?

mQueue
    .Select(input => Observable.FromAsync(() => action(input))
    .Merge(maxDegreeOfParallelism)
    .Subscribe(...);

如果确实maxDegreeOfParallelism总是1,那么只使用Concat而不是Merge:

mQueue
    .Select(input => Observable.FromAsync(() => action(input))
    .Concat()
    .Subscribe(...);

这是因为FromAsync只是创建了一个冷的可观察对象,该对象在订阅之前不会运行异步操作。然后,我们使用MergemaxConcurrency参数(或仅使用Concat)来限制并发订阅的数量(从而限制正在运行的异步操作的数量)。

编辑:

由于您的目标是只拥有一个代表流完成的Task,因此您可以使用ToTask而不是直接订阅。CCD_ 10将订阅并返回具有最终值的CCD_。因为如果可观测值没有产生值,ToTask就会抛出,所以我们将使用Count来保证它产生值:

// task to mark completion
private readonly Task mCompletion;
// ...
this.mCompletion = mQueue
    .Select(input => Observable.FromAsync(() => action(input))
    .Concat()
    .Count()
    .ToTask();

相关内容

  • 没有找到相关文章

最新更新