同步多个可观察的流



我对使用反应性扩展很新,所以这可能是一个新手问题,但是我有以下方案:

i从数据库中获取3个(不同类型的)列表,并填充视图模型。但是,当所有列表都完成加载时,我想协调订阅以触发某些内容。反应性扩展是可能的,还是我以错误的方式思考?像这样的代码:

GetCustomers()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);
GetCountries()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);
GetTransports()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);

您可以尝试使用可观察的连接。这样的东西:

var plan =
    Observable.Start(() => GetCountries())
        .And(Observable.Start(() => GetCustomers()))
        .And(Observable.Start(() => GetTransports()))
        .Then((countries, customers, transports)
            => new { countries, customers, transports });
var query =
    Observable.When(new [] { plan });
query
    .Subscribe(cct =>
    {
        View.Model.AddRange(cct.countries);
        View.Model.AddRange(cct.customers);
        View.Model.AddRange(cct.transports);
    });

它并行运行,您将在最后获得所有结果。

我不确定为什么您会将已经同步的枚举更改为可观察的,但是在RX成语中,您可以:

Observable.Merge(Add(GetCustomers()), Add(GetCountries())..., Add(GetTransports()))
           .Subscribe(() => { }, Completed);

添加可能是:

    private IObservable<Unit> Add<T>(IObservable<T> o)
    {
        return o.Buffer(20)
                .ObserveOn(SynchronizationContext.Current)
                .Do(View.Model.AddRange)
                .Select(_ => Unit.Default);
    }

相关内容

  • 没有找到相关文章