我有一个可观察量,在某些时候必须将内容写入缓存 - 我们希望等待写入完成,然后再完成对可观察量的整个操作(出于报告目的(。
出于测试目的,缓存写入 Completable 如下所示:
Completable.create(
emitter ->
new Thread(
() -> {
try {
Thread.sleep(2000);
doSomething();
emitter.onComplete();
} catch (InterruptedException e) {
e.printStackTrace();
}
})
.start());
由于我有几个缓存写入,我尝试将它们合并到容器类中:
public class CacheInsertionResultsTracker {
private Completable cacheInsertResultsCompletable;
public CacheInsertionResultsTracker() {
this.cacheInsertResultsCompletable = Completable.complete();
}
public synchronized void add(Completable cacheInsertResult) {
this.cacheInsertResultsCompletable = this.cacheInsertResultsCompletable.mergeWith(cacheInsertResult);
}
public Completable getCompletable() {
return this.cacheInsertResultsCompletable;
}
}
我尝试通过以下方式将其与可观察合并:
CacheInsertionResultsTracker tracker = new ...;
observable
.doOnNext(next->tracker.add(next.writeToCache(...)))
.mergeWith(Completable.defer(()->tracker.getCompletable()))
.subscribe(
// on next
this::logNextElement
// on error
this::finishWithError
// on complete
this::finishWithSuccess
);
我怎样才能确保在 finishWithSuccess 被称为 doSomething 时完成? 问题是每次我添加新引用时都会更新 Completable 引用,并且它发生在 mergeWith 运行之后......
似乎适用于我们用例的解决方案是使用 concatWith + defer:
observable
.doOnNext(next->tracker.add(next.writeToCache(...)))
.concatWith(Completable.defer(()->tracker.getCompletable()))
.subscribe(
// on next
this::logNextElement
// on error
this::finishWithError
// on complete
this::finishWithSuccess
);
Concat 确保对 Completable 的订阅仅在 Observable 完成后发生,并将获取最终的 Completable 推迟到此订阅(因此所有对象都已添加到跟踪器中(。
根据注释,您可以将可完成缓存替换为ReplaySubject<Completable>
,执行一些超时以检测不活动并结束可观察序列。
ReplaySubject<Completable> cache = ReplaySubject.create();
cache.onNext(completable);
observable.mergeWith(
cache.flatMapCompletable(v -> v)
.timeout(10, TimeUnit.MILLISECONDS, Completable.complete())
)
编辑:
更新后的示例意味着您希望运行Completable
s 以响应主observable
中的项目,隔离到该序列,并等待所有项目完成。这是flatMap
的典型用例:
observable.flatMap(
next -> next.writeToCache(...).andThen(Observable.just(next))
)
.subscribe(
this::logNextElement
// on error
this::finishWithError
// on complete
this::finishWithSuccess
);