是否可以构建一个自定义调度程序,可以检查通过IObservable的每个元素的值,以决定在哪个线程上处理该项目?
我需要依次处理具有相同键但并行处理不同键的项。这将是有意义的,让RX做调度,而不是不得不离开观察到早于我想要的,以便分配每个值给一个线程。
您试过GroupBy
和ObserveOn
吗?
类似:
source
.GroupBy(item => item.Key)
.SelectMany(group => group
.ObserveOn(Scheduler.NewThread)
.Select(item => process(item))
)
.Subscribe(processResult => ...);
这将按键划分流,为每个键启动一个新线程,并为该键中的每个项运行process()
。