可连接观察者:观察所有订阅者的终止



我有一个具有多个订阅者的可连接观察者。

每个订阅者计算一些业务逻辑。例如,其中一个订阅者在每次onNext调用时将结果存储在数据库中,其他订阅者在内存中积累结果,当onCompleted调用时将结果写入文件。我想知道他们什么时候都完成了他们的工作,所以我可以继续做其他的事情(与其他可连接的观察者聚合,从数据库读取输出数据等)。

这就是我观察终止的方式。它之所以有效,是因为订阅者和观察者在同一个线程中执行。

public Observable<Boolean> observeTermination() {
    return Observable.defer(() -> {
        try {
            start();
            return Observable.just(true);
        } catch (RuntimeException e) {
            return Observable.just(false);
        }
    });
}
void start() {
    Observable<List<Foo>> fooBatchReaderObservable = fooBatchReader.createObservable(BATCH_SIZE);
    ConnectableObservable<List<Foo>> connectableObservable = fooBatchReaderObservable.publish();
    subscribers.forEach(s -> connectableObservable.subscribe(s));
    connectableObservable.connect();
}

所以当observeTermination被调用时,我不想在start方法中执行逻辑,但只有当有人订阅它时才执行逻辑。有没有办法让观察变得更好?

好吧,都不好。问题是,我需要在某个地方调用connect可观察对象,并返回boolean结果作为终止的指示。

不是一个合适的答案,但它需要适当的空间来解释。如果你能处理可观察对象而不是订阅者,那就容易多了;这给了你更大的灵活性来组合它们;

如果你有components:

Collection<Function<Observable<T>, Observable<?>>> components:
Observable<T> tObs = ... .publish().autoConnect(components.size());
Observable
.from(components)
.flatMap(component -> component.apply(tObs))
.ignoreElements()
.doOnTerminate() // or .defaultIfEmpty(...), or .switchIfEmpty(...)
.subscribe(...);

事实上,我认为你根本不应该在这里订阅,只需要创建一个可观察对象并返回它,让它在你代码的其他部分可以用于组合

相关内容

  • 没有找到相关文章

最新更新