如何在 Mono.retry 中实现变量退避?



我觉得我错过了一些简单的基本细微差别,但由于某种原因Mono.delay()对我不起作用。我有 Mono 发出可以限制的 http 请求。我需要等待时间重试。这是它现在的样子

internal fun <T> Mono<T>.retryAfter(maxRetries: Int, uri: URI): Mono<T> {
// `this` is instance of Mono<T>
return this.retryWhen(Retry.from {
it.map { rs ->
val ex = rs.failure()
if (ex is RetryAfterException && rs.totalRetries() < maxRetries) {
println("*** API throttling on call to $uri. Will wait for ${ex.delay}. Retry count: ${rs.totalRetries()}. ms ${System.currentTimeMillis()}")
Mono.delay(ex.delay.plusMillis(500), Schedulers.parallel()).then(Mono.defer({
println("   Waited. ${System.currentTimeMillis()}")
this
}))
} else
Mono.error(rs.failure())
}
})
}

您正在使用map,这会导致Flux<Mono<?>>返回给操作员进行重试控制。 从操作员的角度来看(Flux<?>),任何onNext都意味着"你应该重试"。 是onNext("example")还是onNext(Mono.error(err))并不重要。

不使用map,而是使用concatMap。您在函数中生成的Mono将正确导致Flux<?>,其中if的"延迟"分支产生(延迟)onNext而另一个分支产生onError

您可以使用内置的重试生成器 我建议您使用允许您定义最大重试次数和每次尝试之间的延迟的Retry.fixedDelay()。当达到最大重试次数时,您将获得一个 Mono.error()

internal fun <T> Mono<T>.retryAfter(maxRetries: Long, uri: URI): Mono<T> {
// `this` is instance of Mono<T>
return this.retryWhen(Retry.fixedDelay(maxRetries, Duration.ofMillis(500))
.doBeforeRetry { rs: Retry.RetrySignal? -> println("Will retry " + rs?.totalRetries()) }
.doAfterRetry{ rs: Retry.RetrySignal? -> println("Has retried " + rs?.totalRetries()) }
.onRetryExhaustedThrow { _, rs ->  rs.failure()})
}

最新更新