多次 Rxjava 重试何时处理不同的错误



在我的项目中,我有一个方法,可以以相同的方式订阅每个可观察量。我正在尝试通过在其上放置重试时间选项来增强它。

为了避免大的重试何时处理不同的错误,我设计了这个逻辑

泛型的 RetryFunction 类

abstract class RxStreamLimitedRetryFunction(private val nbOfAttempts: Int, val streamId: String) : Function<Observable<Throwable>, Observable<*>> {
override fun apply(t: Observable<Throwable>): Observable<*> {
return t.flatMap {
if (shouldRetry(it)) Observable.just(it)
else Observable.empty()
}.zipWith(Observable.range(0, nbOfAttempts + 1), BiFunction<Throwable, Int, Int> { throwable, attempts ->
if (attempts == nbOfAttempts) {
throw RetryMaxAttemptsException(nbOfAttempts)
} else {
Log.d("Retry nb ${attempts + 1} out of $nbOfAttempts for stream with id : $streamId with error ${throwable.message} ")
attempts
}
}).flatMap { onRetry(it) }
}
abstract fun onRetry(attempsNb: Int): Observable<*>
abstract fun shouldRetry(throwable: Throwable): Boolean

}

两个子类,每个子类在错误后具有不同的重试尝试

class RxStream404Retry(streamId: String) : RxStreamLimitedRetryFunction(4, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
return Observable.timer(500, TimeUnit.MILLISECONDS)
}
override fun shouldRetry(throwable: Throwable): Boolean {
return true
}  } 
class RxStream500Retry(streamId: String) : RxStreamLimitedRetryFunction(2, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
return Observable.timer(500, TimeUnit.MILLISECONDS)
}
override fun shouldRetry(throwable: Throwable): Boolean {
return false
}}
  • 应该重试方法在此示例中得到简化

所有这些重试函数都可以在重试函数列表中找到它们的方式,该函数使用可观察变压器设置为通过重试每个函数的可观察量

class RetryComposer : ObservableTransformer<RxStreamSuccess, RxStreamSuccess> {
val retryFunctionList = arrayListOf(RxStream404Retry("Test1"),
RxStream500Retry("Test2")
)

override fun apply(upstream: Observable<RxStreamSuccess>): ObservableSource<RxStreamSuccess> {
retryFunctionList.forEach {
upstream.retryWhen(it)
}
return upstream
}}

我的订阅链如下所示:

streamCache[stremId] =   observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { listener.onLoading() }
.compose(RetryComposer())            
.doOnComplete {
Log.d(" Retry onComplete")
streamCache.remove(stremId) }
.subscribe(
{ result -> listener.onSuccess(result) },
{ throwable ->
streamCache.remove(stremId)
}
)

当我使用进入错误的可观察量进行测试时,没有任何反应,我的 RxStream404重试没有触发。您不能在每个可观察对象时进行多次重试吗?

非常感谢

我认为问题来自:

retryFunctionList.forEach {
upstream.retryWhen(it) <- this returns a new Observable that is not attached to any subscriber
}

此代码等效于:

Observable obs1 = upstream.retryWhen(RxStream404Retry("Test1"))
Observable obs2 = upstream.retryWhen(RxStream500Retry("Test2"))
return upstream

因此,这些可观察量不被主 Rx 链的订阅者订阅。 您可能已经查看了该运算符的amb()运算符(http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#amb-java.lang.Iterable-(

您可以尝试以下操作:

return upstream.retryWhen(amb(retryFunctionList)) // pseudo code

这将是粗略的想法。

最新更新