使用RxJava,我有一个源Observable,它发出许多项目,我希望与另一个发出相同类型的Observable相交。 在经历了许多选项之后,似乎最连贯的结构方式是这样的:
Observable<String> source = ...emits 20 items
Observable.create(subscriber -> {
source
.buffer(5)
.subscribe(things -> {
tocheck.getMatches(things) //emits 3 matches
.subscribe(subscriber::onNext, subscriber::onError, () -> {});
}, subscriber::onError, subscriber::onCompleted));
此处的预期输出是,当我订阅生成的可观察对象时,我发出了 12 个项目。 由于 getMatch 的合约,我要求缓冲结果。
从表面上看,这似乎可行,但似乎不是最干净的方式。 过滤器似乎不适用于这里,因为出于性能原因,我无法对每个项目运行交叉检查。 我玩弄了使用flatMap,但是getMatchs可观察量完成了流,而不是来自源可观察量的完成通知。
有没有更好的方法来构建它?
编辑:为了澄清这种代码风格发生了什么:
Observable<String> source = ...emits 20 items
source
.buffer(5)
.flatMap(this::getMatches); //final observable would emit a total of 12 items
这显然更干净,但是当我添加一些日志记录时(假设数据大小与原始代码段相同:
source
.doOnEach(notification -> {
log.trace("Processing {}", notification.getValue());
})
.buffer(5)
.flatMap(this::getMatches)
.doOnEach(notification -> {
log.trace("Processing after match {}", notification.getValue());
});
我得到了 20 个"处理"日志实例,然后奇怪的是,"处理之后"中只有几行日志(当我期望 12 个时)。 它似乎比应有的更早地调用完成。 也许我的结构错误?
所以看起来AndroidEx是正确的。我正在使用 Redis 生菜反应式 API,它看起来行为不合适。 上面添加的代码片段是构建两个可观察量交集的正确方法。