我有一个具有多个订阅者的可连接观察者。
每个订阅者计算一些业务逻辑。例如,其中一个订阅者在每次onNext
调用时将结果存储在数据库中,其他订阅者在内存中积累结果,当onCompleted
调用时将结果写入文件。我想知道他们什么时候都完成了他们的工作,所以我可以继续做其他的事情(与其他可连接的观察者聚合,从数据库读取输出数据等)。
这就是我观察终止的方式。它之所以有效,是因为订阅者和观察者在同一个线程中执行。
public Observable<Boolean> observeTermination() {
return Observable.defer(() -> {
try {
start();
return Observable.just(true);
} catch (RuntimeException e) {
return Observable.just(false);
}
});
}
void start() {
Observable<List<Foo>> fooBatchReaderObservable = fooBatchReader.createObservable(BATCH_SIZE);
ConnectableObservable<List<Foo>> connectableObservable = fooBatchReaderObservable.publish();
subscribers.forEach(s -> connectableObservable.subscribe(s));
connectableObservable.connect();
}
所以当observeTermination
被调用时,我不想在start
方法中执行逻辑,但只有当有人订阅它时才执行逻辑。有没有办法让观察变得更好?
好吧,都不好。问题是,我需要在某个地方调用connect
可观察对象,并返回boolean
结果作为终止的指示。
不是一个合适的答案,但它需要适当的空间来解释。如果你能处理可观察对象而不是订阅者,那就容易多了;这给了你更大的灵活性来组合它们;
如果你有components
:
Collection<Function<Observable<T>, Observable<?>>> components:
Observable<T> tObs = ... .publish().autoConnect(components.size());
Observable
.from(components)
.flatMap(component -> component.apply(tObs))
.ignoreElements()
.doOnTerminate() // or .defaultIfEmpty(...), or .switchIfEmpty(...)
.subscribe(...);
事实上,我认为你根本不应该在这里订阅,只需要创建一个可观察对象并返回它,让它在你代码的其他部分可以用于组合