有几篇关于这方面的文章,我正在处理。。。但我想知道如何同时为我的Observable订阅设置最大数量的Task线程。
我有以下内容来并行异步保存日志条目:
private BlockingCollection<ILogEntry> logEntryQueue;
和
logEntryQueue = new BlockingCollection<ILogEntry>();
logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry);
要安排我的保存。。。但是我如何指定调度程序一次使用的最大线程数呢?
这不是Observable的函数,而是Scheduler的函数。Observable定义what,scheduler定义where。
您需要传入一个自定义调度程序。一个简单的方法是子类化TaskScheduler并覆盖"MaximumConcurrentLevel"属性。
http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx
事实上,我在MSDN上找到了一个这样的示例:
http://msdn.microsoft.com/en-us/library/ee789351.aspx
编辑:您询问了如何从TaskScheduler转到IScheduler。另一位开发人员刚刚给了我一点信息:
var ischedulerForRx = new TaskPoolScheduler
(
new TaskFactory
(
//This is your custom scheduler
new LimitedConcurrencyLevelTaskScheduler(1)
)
);
如果您将您的"工作"创建为延迟执行的IObservable<T>
(即,他们希望在订阅之前做任何事情),则可以使用接受多个最大并发订阅的Merge
重载:
ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize();
queue
.Select(item => StartWork(item))
.Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes
.Subscribe();
// To enqueue:
synchronizedQueue.OnNext(new QueueItem());