RX来管理和加入n个子进程



我正在使用RX合并并行进程,这些进程本身由RX管理(通过MassTransit工作流)。

任务是启动多个进程,一旦它们全部返回,然后继续前进

因此,对于4个过程,我目前有以下完美工作:

IObservable<ExportServiceResult> one = ExecuteExport(campaignId, "Export1");
IObservable<ExportServiceResult> two = ExecuteExport(campaignId, "Export2");
IObservable<ExportServiceResult> three = ExecuteExport(campaignId, "Export3");
IObservable<ExportServiceResult> four = ExecuteExport(campaignId, "Export4");
// we need all exports to complete before we can move to the next step
var allExportsForCampaign = one
    .And(two)
    .And(three)
    .And(four)
    .Then((first, second, third, fourth) =>
    {
         var result = new ExportBatchResult();
         result.ExportResults.Add(first);
         result.ExportResults.Add(second);
         result.ExportResults.Add(third);
         result.ExportResults.Add(fourth);
         return result;
    }
);
return Observable.When(allExportsForCampaign);

在上面的例子中,ExecuteExport(2)返回一个值并完成。

理想情况下,我们希望能够将该系统配置为在配置中提供n导出,并进行动态处理。

我的问题是:是否可以等待n可观察性返回一个值,然后继续使用某种形式的集合中可访问的值?

或者我是不是走错了路,还有其他方法可以使用RX来解决这个问题?

谢谢,Paul

您可以将可观测值与Merge()组合,然后聚合得到的可观测值。

IEnumerable<IObservable<ExportServiceResult>> observers;
IEnumerable<ExportServiceResult> result = new List<ExportServiceResult>();
var o = observers.Aggregate( (x,y) => x.Merge(y))
    .Aggregate( result, (l,x) => l.Concat( new[]{ x }));

一种稍微干净一点的方法是:

IEnumerable<IObservable<TResult>> obs;
IList<TResult> result = Observable.Zip(obs).Wait(); 

如果你不想阻止它,只需去掉Wait(),就可以有一个IObservable<IList<TResult>>,你可以订阅或用更多的Rx创作。

相关内容

  • 没有找到相关文章

最新更新