我有一个返回HOT Observable<Integer>
的方法。在订阅时,发出一个值,然后其他Integers
流将(最终)到来。
我有10个这些Observable<Integer>
(不同的实例)。我想做的,用英语写下来是:
- 如果所有这些
10
可观测到的0
作为第一个Integer
- then mergeWith a
Fallback set of observable
这个逻辑背后的原因是,我想检查我的db(前10个Observable)的一部分,如果db包含数据,那么坚持第一组10个Observable。但是,如果数据库当前在这些位置不包含数据(整数0),则使用"数据库的一部分"来包含数据。
您可以获取第一个元素(使用take(1)
)并从每个可观察对象中过滤0
,然后计算项目数量(使用count()
)。
"棘手"的部分是你的可观测物是热的。您必须确保没有遗漏第一个元素。
Observable.just(obs1, ob2, ..., obs10)
.flatMap(obs -> obs.take(1).filter(i -> i == 0))
.count()
.filter(count -> count == 10)
.isEmpty()
.flatMap(isEmpty -> {
if(isEmpty) {
return Observable.empty();
} else {
return fallbackObservable();
}
}).subscribe();