Rx中的多重选择



我有一个这样的界面:

interface IProcessor{
    IObservable<Item> Process(Item item);
}

我有一个工作数组:

IProcessor[] _workers = ....

我想通过所有的工人传递一个项目:

var ret = Observable.Return(item);
for (var i = 0; i < _workers.Length; i++)
{
    int index = i;
    ret = ret
        .SelectMany(r => _workers[index].Process(r))
    ;
}
return ret;

我不太满意这个样子——有更干净的方法吗?

这个适合我:

IObservable<Item> ret = _workers.Aggregate(
    Observable.Return(item),
    (rs, w) =>
        from r in rs
        from p in w.Process(r)
        select p);

请记住,这种可观察对象的聚合——无论是在你的问题还是在我的回答中——都可能很快导致内存问题(即堆栈溢出)。在我的测试中,我可以让400个工人工作,但500个导致崩溃。

你最好改变你的IProcessor不使用可观察对象,并实现你的可观察对象如下:

interface IProcessor{
    Item Process(Item item);
}
var f =
    _workers.Aggregate<IProcessor, Func<Item, Item>>(
            i => i,
            (fs, p) => i => p.Process(fs(i)));
var ret = Observable.Start(() => f(item), Scheduler.ThreadPool);

使用这种方法,我可以在堆栈溢出之前获得超过20,000个嵌套的工人,并且结果几乎是即时的。

可能是这样的:?

var item = new Item();
_workers
  .ToObservable()
  .SelectMany(worker => worker.Process(item))
  .Subscribe(item => ...);

我假设工人可以并行地处理项目。

注:如果需要顺序处理,则为

var item = new Item();
_workers
  .ToObservable()
  .Select(worker => worker.Process(item))
  .Concat()
  .Subscribe(item => ...);

相关内容

  • 没有找到相关文章

最新更新