我正在使用RX合并并行进程,这些进程本身由RX管理(通过MassTransit工作流)。
任务是启动多个进程,一旦它们全部返回,然后继续前进
因此,对于4个过程,我目前有以下完美工作:
IObservable<ExportServiceResult> one = ExecuteExport(campaignId, "Export1");
IObservable<ExportServiceResult> two = ExecuteExport(campaignId, "Export2");
IObservable<ExportServiceResult> three = ExecuteExport(campaignId, "Export3");
IObservable<ExportServiceResult> four = ExecuteExport(campaignId, "Export4");
// we need all exports to complete before we can move to the next step
var allExportsForCampaign = one
.And(two)
.And(three)
.And(four)
.Then((first, second, third, fourth) =>
{
var result = new ExportBatchResult();
result.ExportResults.Add(first);
result.ExportResults.Add(second);
result.ExportResults.Add(third);
result.ExportResults.Add(fourth);
return result;
}
);
return Observable.When(allExportsForCampaign);
在上面的例子中,ExecuteExport(2)返回一个值并完成。
理想情况下,我们希望能够将该系统配置为在配置中提供n导出,并进行动态处理。
我的问题是:是否可以等待n可观察性返回一个值,然后继续使用某种形式的集合中可访问的值?
或者我是不是走错了路,还有其他方法可以使用RX来解决这个问题?
谢谢,Paul
您可以将可观测值与Merge()组合,然后聚合得到的可观测值。
IEnumerable<IObservable<ExportServiceResult>> observers;
IEnumerable<ExportServiceResult> result = new List<ExportServiceResult>();
var o = observers.Aggregate( (x,y) => x.Merge(y))
.Aggregate( result, (l,x) => l.Concat( new[]{ x }));
一种稍微干净一点的方法是:
IEnumerable<IObservable<TResult>> obs;
IList<TResult> result = Observable.Zip(obs).Wait();
如果你不想阻止它,只需去掉Wait()
,就可以有一个IObservable<IList<TResult>>
,你可以订阅或用更多的Rx创作。