我想使用Rx扩展来处理长文件绑定操作的并行化。
工作流是这样的:
- 在多个驱动器上搜索给定的文件模式(假设每个驱动器都在单独的物理设备上)
- 对于找到的每个匹配文件,将长文件操作排队到与同一驱动器上的其他文件相同的线程中-希望最大限度地减少随机查找。 对不同驱动器上文件的
- 操作应该排队到不同的线程中,以允许并行处理。
我的问题是:我应该使用什么Rx调度器(或调度器的组合)?
为此,认识到每个Rx可观察对象订阅是串行工作的是非常有用的。也就是说,对于单个可观察对象的单个订阅,您可以确保一个项目的onNext
委托在另一个项目的onNext
开始之前完成。
默认情况下,onNext
委托在当前线程(调用OnNext()
的线程)上执行,但您可以通过使用ObserveOn()
来更改。
这意味着你应该为每个物理驱动器创建一个单独的可观察对象,并在一个单独的线程上观察每个物理驱动器。有一种方法可以做到这一点,如果你有一个可观察的操作执行是使用GroupBy()
。
使用哪个特定的调度器?我觉得这没什么关系。ObserveOn()
似乎使用ScheduleLongRunning()
,如果它是可用的,这对于最常见的调度器意味着它将创建一个新的线程来观察。
operations.GroupBy(op => op.Drive)
.Select(o => o.ObserveOn(TaskPoolScheduler.Default))
.Do(o => o.Subscribe(op => op.Execute()))
.Subscribe();
(假设operations
是操作类型的可观察对象,具有Drive
属性和Execute()
方法)