RxJava 2.缓冲一个可观察项,直到其他可观察项完成



我有一个任务来合并两个可观察量 - 一个是来自数据库的存在的有限可观察量(称为databaseStream(,另一个是来自队列(queueStream(的无限可观察量。当客户端订阅时,我需要立即订阅queueStream,但缓冲项目直到databaseStream完全完成。发生这种情况时,我想发出queueStream的所有缓冲项。之后,必须毫不延迟地发出queueStream中的所有项目。有没有方便的方法可以用RxJava 2做到这一点?

在这里找到了灵感

Flowable<Foo> queueStream  = queueStream()
.subscribeOn(Schedulers.newThread());
Flowable<Foo> databaseStream = databaseStream()
.subscribeOn(Schedulers.newThread());
Flowable.concatEager(Arrays.asList(databaseStream, queueStream), 2 , 1000)
.distinct(identityFunction())
.blockingSubscribe(System.err::println);

最新更新