rx-java-带有优先级的RxJava Zip



问题出在这里。我有许多异步操作,它们的结果被聚合到一个单独的操作中,并进行进一步处理。然而,并不是所有的操作都是相等的,错误处理也因操作失败而不同。

为了详细说明,假设我们有操作A、B和C。如果A失败,我们需要结束处理,但如果B或C失败,我们将继续正常处理其他操作。

目前,我们使用倒计数锁存器和大量状态管理来实现这一点,需要将近一百行代码。我想把它转移到一个基于RxJava的实现中。我的第一个想法是尝试Observable.zip算子,但它将所有可观察性视为相等,在这种情况下这不是真的。我的另一个想法是链接调用,这是有效的,但这意味着所有操作不会同时启动,从而导致更大的总体时间。

有人能指导我如何做到这一点吗?

使用.onErrorResumeNext:

Observable<T> a, b, c;
Observable.zip(
    a,
    b.onErrorResumeNext(t -> Observable.just(null)),
    c.onErrorResumeNext(t -> Observable.just(null)),
   (x, y, z) -> <your aggregation>)
...

null表示误差可观察性由您决定。您也可以使用Optional:

Observable.zip(
    a,
    b.map(x -> Optional.of(x))
     .onErrorResumeNext(t -> Observable.just(Optional.empty())),
    c.map(x -> Optional.of(x))
     .onErrorResumeNext(t -> Observable.just(Optional.empty())),
   (x, y, z) -> <your aggregation>)

如果bc对应于外部服务调用,如果它们花费的时间太长,您可能还想通过用b.timeout(5, TimeUnit.SECONDS)替换b来忽略它们。简洁啊!

相关内容

  • 没有找到相关文章

最新更新