具有响应式扩展的异步队列处理



有几篇关于这方面的文章,我正在处理。。。但我想知道如何同时为我的Observable订阅设置最大数量的Task线程。

我有以下内容来并行异步保存日志条目:

private BlockingCollection<ILogEntry> logEntryQueue;

 logEntryQueue = new BlockingCollection<ILogEntry>();
 logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry);

要安排我的保存。。。但是我如何指定调度程序一次使用的最大线程数呢?

这不是Observable的函数,而是Scheduler的函数。Observable定义what,scheduler定义where

您需要传入一个自定义调度程序。一个简单的方法是子类化TaskScheduler并覆盖"MaximumConcurrentLevel"属性。

http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

事实上,我在MSDN上找到了一个这样的示例:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

编辑:您询问了如何从TaskScheduler转到IScheduler。另一位开发人员刚刚给了我一点信息:

var ischedulerForRx = new TaskPoolScheduler
(
    new TaskFactory
    (
        //This is your custom scheduler
        new LimitedConcurrencyLevelTaskScheduler(1)
    )
);

如果您将您的"工作"创建为延迟执行的IObservable<T>(即,他们希望在订阅之前做任何事情),则可以使用接受多个最大并发订阅的Merge重载:

ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize();
queue
    .Select(item => StartWork(item))
    .Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes
    .Subscribe();
// To enqueue:
synchronizedQueue.OnNext(new QueueItem());

相关内容

  • 没有找到相关文章