如何在可观察<可观察中检测所有包含的可观察量的完成<Object>>



我希望我的API上的方法返回可观察的&lt; observable&lt; object&gt;&gt;&gt;但是我希望该方法中的代码一旦所有可观察到的东西都完成,以便它可以关闭某些内容。最好的方法是什么?

要更加明确,我正在追求这种方法:

public static <T> Observable<Observable<T>> doWhenAllComplete(
        final Observable<Observable<T>> original, Action0 action) {
  ...
}

歉意,我的答案在.NET中(System.Reactive Tag);我确定您可以翻译它!

如果您的IObservable<IObservable<Object>>source给出,则:

source.Merge()
      .Subscribe(_  => {}, /* not interested in onNext */
                 () => /* onCompleted action here, called when all complete */);

注意:如果任何流错误(导致合并的流终止),这将分解,因此您也可以这样做以吞噬单个流的错误:

source.SelectMany(x => x.Catch(Observable.Empty<Object>()))
      .Subscribe(_  => {}, /* not interested in onNext */
                 () => /* onCompleted action here, called when all complete */);

该方法的实现似乎可以在没有副作用的情况下解决问题:

public static <T> Observable<Observable<T>> doWhenAllComplete(
        final Observable<Observable<T>> original, final Action0 action) {
    return Observable.create(new OnSubscribeFunc<Observable<T>>() {
        @Override
        public Subscription onSubscribe(Observer<? super Observable<T>> o) {
            ConnectableObservable<Observable<T>> published = original
                    .publish();
            Subscription sub1 = Observable.merge(published)
                    .doOnCompleted(action).subscribe();
            Subscription sub2 = published.subscribe(o);
            Subscription sub3 = published.connect();
            return Subscriptions.from(sub1, sub2, sub3);
        }
    });
}

对我来说,这有效:

bothSources = source1.Cast<Object>().Merge (source2.Cast<Object>());

就我而言,我只需要等待2个来源,但是您可以创建一个接收源列表并将其合并的函数。

相关内容

  • 没有找到相关文章