限制使用 Monix Observable 的 .onErrorRestartIf?



Monix 可观察量具有 API.onErrorRestartIf(f: Throwable => Boolean).onErrorRestart(times: Int)。如何指定它应该重试执行.onErrorRestartIf的最大次数?

您可以基于onErrorHandleWith构建自己的循环:

def retryLimited[A](fa: Observable[A], maxRetries: Int)
(p: Throwable => Boolean): Observable[A] = {
// If we have no retries left, return the source untouched
if (maxRetries <= 0) fa else
fa.onErrorHandleWith { err =>
// If predicate holds, do recursive call
if (p(err)) 
retryLimited(fa, maxRetries - 1)(p)
else 
Observable.raiseError(err)
}
}

如果你不喜欢简单的函数(我不喜欢(,你可以随时公开一些扩展方法作为替代方案:

implicit class ObservableExtensions[A](val self: Observable[A]) 
extends AnyVal {
def onErrorRetryLimited(maxRetries: Int)
(p: Throwable => Boolean): Observable[A] = 
retryLimited(self, maxRetries)(p)
}

请注意,@JVS的答案在精神上是可以的,但可能会有问题,因为它保持共享的可变状态,这对于冷可观察量来说是不行的。因此,请注意,如果您执行以下操作会发生什么:

val source = Observable.suspend { 
if (Random.nextInt() % 10 != 0)
Observable.raiseError(new RuntimeException("dummy"))
else
Observable(1, 2, 3)
} 
val listT = source
.onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart))
.toListL
listT.runAsync // OK
listT.runAsync // Ooops, shared state, we might not have retries left

警惕 Observable 运算符中的可变共享状态。当然,您可以像这样工作,但您必须意识到其中的危险:-(

警告:这使用共享可变状态,对于冷可观察量可能不正确。请看亚历山德鲁的回答。

定义一个函数来执行此操作:

def limitedRetries(maxRetries: AtomicInt, shouldRetryOnException: Throwable => Boolean): Throwable => Boolean = 
ex => maxRetries.decrementAndGet() > 0 && shouldRetryOnException(ex)

并在onErrorRestartIf中使用此功能

.onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart))

仅供参考,在这里使用了Monix AtomicInt...

import monix.execution.atomic.AtomicInt

相关内容

  • 没有找到相关文章

最新更新