给定以下代码:
Observable<String> obs1 = Observable.just("1", "2", "3");
Observable<String> obs2 = Observable.just("a", "b");
Observable.combineLatest(obs1,
obs2,
(s1, s2) -> s1 + ":" + s2
).subscribe(System.out::println);
我希望它输出如下内容:
1:a
2:a
3:a
3:b
但是它打印的是
3:a
3:b
为什么我的第一个可观察对象只发出最后一项?在combinellatest之前没有活动订阅,所以它应该是冷的。我怎么能确保所有项目从两个可观察的组合在一起?
这是因为您有同步源。当combineLatest
订阅它的源时,默认情况下它从中预取128个元素。这意味着您的第一个源将同步运行到完成,而第二个订阅甚至在完成之前都不会发生。由于第一个源是单独的,所以除了最后一个元素外,其他元素都被丢弃了。一旦订阅了第二个源,它将从第一个源中找到最后一个源,并将自己与它合并。
你想达到什么目的?如果需要成对组合,则使用zip
。如果你想要first和second的所有组合,使用flatMap
:
obs1.flatMap(v -> obs2.map(w -> v + ":" + w)).subscribe(...)
谢谢你的回答,akarnokd。128项的预取是非常有用的洞察力。我已经通过切换两个Observables解决了这个问题,所以
Observable<String> obs1 = Observable.just("1", "2", "3", "4");
Observable<String> obs2 = Observable.just("4", "b", "1");
Observable.combineLatest(obs1,
obs2.toList(),
(s1, list) -> list.stream().anyMatch((listItem) -> s1.contains(listItem)) ? null : s1)
.filter(StringUtils::isNotEmpty)
.subscribe(System.out::println);
现在的工作方式如下:
Observable<String> obs1 = Observable.just("1", "2", "3", "4");
Observable<String> obs2 = Observable.just("4", "b", "1");
Observable.combineLatest(obs2.toList(),
obs1,
(list, s1) -> list.stream().anyMatch((listItem) -> s1.contains(listItem)) ? null : s1)
.filter(StringUtils::isNotEmpty)
.subscribe(System.out::println);