有没有更好的方法来使用 RxJava 获取两个可观察量的交集



使用RxJava,我有一个源Observable,它发出许多项目,我希望与另一个发出相同类型的Observable相交。 在经历了许多选项之后,似乎最连贯的结构方式是这样的:

Observable<String> source = ...emits 20 items
Observable.create(subscriber -> {
    source
        .buffer(5)
        .subscribe(things -> {
            tocheck.getMatches(things) //emits 3 matches
                .subscribe(subscriber::onNext, subscriber::onError, () -> {});
        }, subscriber::onError, subscriber::onCompleted));

此处的预期输出是,当我订阅生成的可观察对象时,我发出了 12 个项目。 由于 getMatch 的合约,我要求缓冲结果。

从表面上看,这似乎可行,但似乎不是最干净的方式。 过滤器似乎不适用于这里,因为出于性能原因,我无法对每个项目运行交叉检查。 我玩弄了使用flatMap,但是getMatchs可观察量完成了流,而不是来自源可观察量的完成通知。

有没有更好的方法来构建它?

编辑:为了澄清这种代码风格发生了什么:

Observable<String> source = ...emits 20 items
source
    .buffer(5)
    .flatMap(this::getMatches);  //final observable would emit a total of 12 items

这显然更干净,但是当我添加一些日志记录时(假设数据大小与原始代码段相同:

source
    .doOnEach(notification -> {
        log.trace("Processing {}", notification.getValue());
    })
    .buffer(5)
    .flatMap(this::getMatches)
    .doOnEach(notification -> {
        log.trace("Processing after match {}", notification.getValue());
    });
我得到了 20 个"处理"日志

实例,然后奇怪的是,"处理之后"中只有几行日志(当我期望 12 个时)。 它似乎比应有的更早地调用完成。 也许我的结构错误?

所以看起来AndroidEx是正确的。我正在使用 Redis 生菜反应式 API,它看起来行为不合适。 上面添加的代码片段是构建两个可观察量交集的正确方法。

最新更新