RxJava groupBy和随后的阻塞操作(onComplete丢失?)



我把我的问题归结为以下代码片段:

Observable<Integer> numbers = Observable.just(1, 2, 3);
Observable<GroupedObservable<Integer,Integer>> outer = numbers.groupBy(i->i%3);
System.out.println(outer.count().toBlocking().single());

无止境地阻塞。我读了几篇文章,相信我理解了这个问题:groupedoobservables不会调用onComplete,直到它们的内部Observables也完成了。不幸的是,虽然我仍然不能得到上面的片段打印!

例如:

Observable<Integer> just = Observable.just(1, 2, 3);
Observable<GroupedObservable<Integer,Integer>> groupBy = just.groupBy(i->i%3);
groupBy.subscribe(inner -> inner.ignoreElements());
System.out.println(groupBy.count().toBlocking().single());

仍然不执行任何操作。我是否误解了问题?还有别的问题吗?简而言之,我怎样才能让上面的代码片段工作呢?

提前感谢,丹。

是的,您必须以某种方式使用这些组。第二个示例不起作用,因为对分组操作有两个独立的订阅。

通常,解是flatMap,但不是ignoreElements,因为它会完成,而count不会得到任何元素。相反,您可以使用takeLast(1):

Observable.just(1, 2, 3)
    .groupBy(k -> k % 3)
    .flatMap(g -> g.takeLast(1))
    .count()
    .toBlocking()
    .forEach(System.out::println);

相关内容

  • 没有找到相关文章

最新更新