我把我的问题归结为以下代码片段:
Observable<Integer> numbers = Observable.just(1, 2, 3);
Observable<GroupedObservable<Integer,Integer>> outer = numbers.groupBy(i->i%3);
System.out.println(outer.count().toBlocking().single());
无止境地阻塞。我读了几篇文章,相信我理解了这个问题:groupedoobservables不会调用onComplete,直到它们的内部Observables也完成了。不幸的是,虽然我仍然不能得到上面的片段打印!
例如:
Observable<Integer> just = Observable.just(1, 2, 3);
Observable<GroupedObservable<Integer,Integer>> groupBy = just.groupBy(i->i%3);
groupBy.subscribe(inner -> inner.ignoreElements());
System.out.println(groupBy.count().toBlocking().single());
仍然不执行任何操作。我是否误解了问题?还有别的问题吗?简而言之,我怎样才能让上面的代码片段工作呢?
提前感谢,丹。
是的,您必须以某种方式使用这些组。第二个示例不起作用,因为对分组操作有两个独立的订阅。
通常,解是flatMap
,但不是ignoreElements
,因为它会完成,而count
不会得到任何元素。相反,您可以使用takeLast(1)
:
Observable.just(1, 2, 3)
.groupBy(k -> k % 3)
.flatMap(g -> g.takeLast(1))
.count()
.toBlocking()
.forEach(System.out::println);