问题出在这里。我有许多异步操作,它们的结果被聚合到一个单独的操作中,并进行进一步处理。然而,并不是所有的操作都是相等的,错误处理也因操作失败而不同。
为了详细说明,假设我们有操作A、B和C。如果A失败,我们需要结束处理,但如果B或C失败,我们将继续正常处理其他操作。
目前,我们使用倒计数锁存器和大量状态管理来实现这一点,需要将近一百行代码。我想把它转移到一个基于RxJava的实现中。我的第一个想法是尝试Observable.zip
算子,但它将所有可观察性视为相等,在这种情况下这不是真的。我的另一个想法是链接调用,这是有效的,但这意味着所有操作不会同时启动,从而导致更大的总体时间。
有人能指导我如何做到这一点吗?
使用.onErrorResumeNext
:
Observable<T> a, b, c;
Observable.zip(
a,
b.onErrorResumeNext(t -> Observable.just(null)),
c.onErrorResumeNext(t -> Observable.just(null)),
(x, y, z) -> <your aggregation>)
...
用null
表示误差可观察性由您决定。您也可以使用Optional
:
Observable.zip(
a,
b.map(x -> Optional.of(x))
.onErrorResumeNext(t -> Observable.just(Optional.empty())),
c.map(x -> Optional.of(x))
.onErrorResumeNext(t -> Observable.just(Optional.empty())),
(x, y, z) -> <your aggregation>)
如果b
和c
对应于外部服务调用,如果它们花费的时间太长,您可能还想通过用b.timeout(5, TimeUnit.SECONDS)
替换b
来忽略它们。简洁啊!