结合最新和冷观测



给定以下代码:

        Observable<String> obs1 = Observable.just("1", "2", "3");
        Observable<String> obs2 = Observable.just("a", "b");
        Observable.combineLatest(obs1,
                                 obs2,
                                 (s1, s2) -> s1 + ":" + s2
        ).subscribe(System.out::println);

我希望它输出如下内容:

1:a
2:a
3:a
3:b

但是它打印的是

3:a
3:b

为什么我的第一个可观察对象只发出最后一项?在combinellatest之前没有活动订阅,所以它应该是冷的。我怎么能确保所有项目从两个可观察的组合在一起?

这是因为您有同步源。当combineLatest订阅它的源时,默认情况下它从中预取128个元素。这意味着您的第一个源将同步运行到完成,而第二个订阅甚至在完成之前都不会发生。由于第一个源是单独的,所以除了最后一个元素外,其他元素都被丢弃了。一旦订阅了第二个源,它将从第一个源中找到最后一个源,并将自己与它合并。

你想达到什么目的?如果需要成对组合,则使用zip。如果你想要first和second的所有组合,使用flatMap:

obs1.flatMap(v -> obs2.map(w -> v + ":" + w)).subscribe(...)

谢谢你的回答,akarnokd。128项的预取是非常有用的洞察力。我已经通过切换两个Observables解决了这个问题,所以

Observable<String> obs1 = Observable.just("1", "2", "3", "4");
Observable<String> obs2 = Observable.just("4", "b", "1");
Observable.combineLatest(obs1,
                         obs2.toList(),
                         (s1, list) -> list.stream().anyMatch((listItem) -> s1.contains(listItem)) ? null : s1)
          .filter(StringUtils::isNotEmpty)
          .subscribe(System.out::println);

现在的工作方式如下:

Observable<String> obs1 = Observable.just("1", "2", "3", "4");
Observable<String> obs2 = Observable.just("4", "b", "1");
Observable.combineLatest(obs2.toList(),
                         obs1,
                         (list, s1) -> list.stream().anyMatch((listItem) -> s1.contains(listItem)) ? null : s1)
          .filter(StringUtils::isNotEmpty)
          .subscribe(System.out::println);

相关内容

  • 没有找到相关文章

最新更新