可以将同一驱动器上的文件操作批处理到同一线程的线程调度器



我想使用Rx扩展来处理长文件绑定操作的并行化。

工作流是这样的:

  • 在多个驱动器上搜索给定的文件模式(假设每个驱动器都在单独的物理设备上)
  • 对于找到的每个匹配文件,将长文件操作排队到与同一驱动器上的其他文件相同的线程中-希望最大限度地减少随机查找。
  • 对不同驱动器上文件的
  • 操作应该排队到不同的线程中,以允许并行处理。

我的问题是:我应该使用什么Rx调度器(或调度器的组合)?

为此,认识到每个Rx可观察对象订阅是串行工作的是非常有用的。也就是说,对于单个可观察对象的单个订阅,您可以确保一个项目的onNext委托在另一个项目的onNext开始之前完成。

默认情况下,onNext委托在当前线程(调用OnNext()的线程)上执行,但您可以通过使用ObserveOn()来更改。

这意味着你应该为每个物理驱动器创建一个单独的可观察对象,并在一个单独的线程上观察每个物理驱动器。有一种方法可以做到这一点,如果你有一个可观察的操作执行是使用GroupBy()

使用哪个特定的调度器?我觉得这没什么关系。ObserveOn()似乎使用ScheduleLongRunning(),如果它是可用的,这对于最常见的调度器意味着它将创建一个新的线程来观察。

把所有这些放在一起,你的代码看起来像:
operations.GroupBy(op => op.Drive)
          .Select(o => o.ObserveOn(TaskPoolScheduler.Default))
          .Do(o => o.Subscribe(op => op.Execute()))
          .Subscribe();

(假设operations是操作类型的可观察对象,具有Drive属性和Execute()方法)

相关内容

  • 没有找到相关文章

最新更新