我有一个这样的界面:
interface IProcessor{
IObservable<Item> Process(Item item);
}
我有一个工作数组:
IProcessor[] _workers = ....
我想通过所有的工人传递一个项目:
var ret = Observable.Return(item);
for (var i = 0; i < _workers.Length; i++)
{
int index = i;
ret = ret
.SelectMany(r => _workers[index].Process(r))
;
}
return ret;
我不太满意这个样子——有更干净的方法吗?
这个适合我:
IObservable<Item> ret = _workers.Aggregate(
Observable.Return(item),
(rs, w) =>
from r in rs
from p in w.Process(r)
select p);
请记住,这种可观察对象的聚合——无论是在你的问题还是在我的回答中——都可能很快导致内存问题(即堆栈溢出)。在我的测试中,我可以让400个工人工作,但500个导致崩溃。
你最好改变你的IProcessor
不使用可观察对象,并实现你的可观察对象如下:
interface IProcessor{
Item Process(Item item);
}
var f =
_workers.Aggregate<IProcessor, Func<Item, Item>>(
i => i,
(fs, p) => i => p.Process(fs(i)));
var ret = Observable.Start(() => f(item), Scheduler.ThreadPool);
使用这种方法,我可以在堆栈溢出之前获得超过20,000个嵌套的工人,并且结果几乎是即时的。
可能是这样的:?
var item = new Item();
_workers
.ToObservable()
.SelectMany(worker => worker.Process(item))
.Subscribe(item => ...);
我假设工人可以并行地处理项目。
注:如果需要顺序处理,则为
var item = new Item();
_workers
.ToObservable()
.Select(worker => worker.Process(item))
.Concat()
.Subscribe(item => ...);