RxJava:将 Observable 与 Completable 合并不起作用



我有一个可观察量,在某些时候必须将内容写入缓存 - 我们希望等待写入完成,然后再完成对可观察量的整个操作(出于报告目的(。

出于测试目的,缓存写入 Completable 如下所示:

Completable.create(
emitter ->
new Thread(
() -> {
try {
Thread.sleep(2000);
doSomething();
emitter.onComplete();
} catch (InterruptedException e) {
e.printStackTrace();
}
})
.start());

由于我有几个缓存写入,我尝试将它们合并到容器类中:

public class CacheInsertionResultsTracker {
private Completable cacheInsertResultsCompletable;
public CacheInsertionResultsTracker() {
this.cacheInsertResultsCompletable = Completable.complete();
}
public synchronized void add(Completable cacheInsertResult) {
this.cacheInsertResultsCompletable = this.cacheInsertResultsCompletable.mergeWith(cacheInsertResult);
}
public Completable getCompletable() {
return this.cacheInsertResultsCompletable;
}
}

我尝试通过以下方式将其与可观察合并:

CacheInsertionResultsTracker tracker = new ...;
observable
.doOnNext(next->tracker.add(next.writeToCache(...)))
.mergeWith(Completable.defer(()->tracker.getCompletable()))
.subscribe(
// on next
this::logNextElement
// on error
this::finishWithError
// on complete
this::finishWithSuccess
);

我怎样才能确保在 finishWithSuccess 被称为 doSomething 时完成? 问题是每次我添加新引用时都会更新 Completable 引用,并且它发生在 mergeWith 运行之后......

似乎适用于我们用例的解决方案是使用 concatWith + defer:

observable
.doOnNext(next->tracker.add(next.writeToCache(...)))
.concatWith(Completable.defer(()->tracker.getCompletable()))
.subscribe(
// on next
this::logNextElement
// on error
this::finishWithError
// on complete
this::finishWithSuccess
);

Concat 确保对 Completable 的订阅仅在 Observable 完成后发生,并将获取最终的 Completable 推迟到此订阅(因此所有对象都已添加到跟踪器中(。

根据注释,您可以将可完成缓存替换为ReplaySubject<Completable>,执行一些超时以检测不活动并结束可观察序列。

ReplaySubject<Completable> cache = ReplaySubject.create();
cache.onNext(completable);
observable.mergeWith(
cache.flatMapCompletable(v -> v)
.timeout(10, TimeUnit.MILLISECONDS, Completable.complete())
)

编辑:

更新后的示例意味着您希望运行Completables 以响应主observable中的项目,隔离到该序列,并等待所有项目完成。这是flatMap的典型用例:

observable.flatMap(
next -> next.writeToCache(...).andThen(Observable.just(next))
)
.subscribe(
this::logNextElement
// on error
this::finishWithError
// on complete
this::finishWithSuccess
);

相关内容

  • 没有找到相关文章

最新更新