我觉得我错过了一些简单的基本细微差别,但由于某种原因Mono.delay()
对我不起作用。我有 Mono 发出可以限制的 http 请求。我需要等待时间重试。这是它现在的样子
internal fun <T> Mono<T>.retryAfter(maxRetries: Int, uri: URI): Mono<T> {
// `this` is instance of Mono<T>
return this.retryWhen(Retry.from {
it.map { rs ->
val ex = rs.failure()
if (ex is RetryAfterException && rs.totalRetries() < maxRetries) {
println("*** API throttling on call to $uri. Will wait for ${ex.delay}. Retry count: ${rs.totalRetries()}. ms ${System.currentTimeMillis()}")
Mono.delay(ex.delay.plusMillis(500), Schedulers.parallel()).then(Mono.defer({
println(" Waited. ${System.currentTimeMillis()}")
this
}))
} else
Mono.error(rs.failure())
}
})
}
您正在使用map
,这会导致Flux<Mono<?>>
返回给操作员进行重试控制。 从操作员的角度来看(Flux<?>
),任何onNext
都意味着"你应该重试"。 是onNext("example")
还是onNext(Mono.error(err))
并不重要。
不使用map
,而是使用concatMap
。您在函数中生成的Mono
将正确导致Flux<?>
,其中if
的"延迟"分支产生(延迟)onNext
而另一个分支产生onError
。
您可以使用内置的重试生成器 我建议您使用允许您定义最大重试次数和每次尝试之间的延迟的Retry.fixedDelay()
。当达到最大重试次数时,您将获得一个 Mono.error()
internal fun <T> Mono<T>.retryAfter(maxRetries: Long, uri: URI): Mono<T> {
// `this` is instance of Mono<T>
return this.retryWhen(Retry.fixedDelay(maxRetries, Duration.ofMillis(500))
.doBeforeRetry { rs: Retry.RetrySignal? -> println("Will retry " + rs?.totalRetries()) }
.doAfterRetry{ rs: Retry.RetrySignal? -> println("Has retried " + rs?.totalRetries()) }
.onRetryExhaustedThrow { _, rs -> rs.failure()})
}