spring 反应式 WebClient,我使用了一个 API,如果响应状态为 500,我需要以指数退避重试。但是在 Mono 类中,我没有看到任何将谓词作为输入参数的重试回退。
这是我搜索的函数类型:
public final Mono<T> retryBackoff(Predicate<? super Throwable> retryMatcher, long numRetries, Duration firstBackoff)
现在我的实现如下(我没有使用回退机制重试(:
client.sendRequest()
.retry(e -> ((RestClientException) e).getStatus() == 500)
.subscribe();
你可能想看看 reactor-adddons 项目中的 reactor-extra
模块。在Maven中,您可以执行以下操作:
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
<version>3.2.3.RELEASE</version>
</dependency>
然后像这样使用它:
client.post()
.syncBody("test")
.retrieve()
.bodyToMono(String.class)
.retryWhen(Retry.onlyIf(ctx -> ctx.exception() instanceof RestClientException)
.exponentialBackoff(firstBackoff, maxBackoff)
.retryMax(maxRetries))
Retry.onlyIf
现已弃用/删除。
如果有人对最新的解决方案感兴趣:
client.post()
.syncBody("test")
.retrieve()
.bodyToMono(String.class)
.retryWhen(Retry.backoff(maxRetries, minBackoff).filter(ctx -> {
return ctx.exception() instanceof RestClientException && ctx.exception().statusCode == 500;
}))
值得一提的是,retryWhen
将源异常包装到RetryExhaustedException
中。如果要"恢复"源异常,可以使用reactor.core.Exceptions
实用程序:
.onErrorResume(throwable -> {
if (Exceptions.isRetryExhausted(throwable)) {
throwable = throwable.getCause();
}
return Mono.error(throwable);
})
我不确定,您使用的是哪个 Spring 版本,在 2.1.4 中我有这个:
client.post()
.syncBody("test")
.retrieve()
.bodyToMono(String.class)
.retryBackoff(numretries, firstBackoff, maxBackoff, jitterFactor);
。所以这正是你想要的,对吧?
我目前正在尝试使用 Kotlin 协程 + Spring WebFlux:
似乎以下内容不起作用:
suspend fun ClientResponse.asResponse(): ServerResponse =
status(statusCode())
.headers { headerConsumer -> headerConsumer.addAll(headers().asHttpHeaders()) }
.body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
.retryWhen {
Retry.onlyIf { ctx: RetryContext<Throwable> -> (ctx.exception() as? WebClientResponseException)?.statusCode in retryableErrorCodes }
.exponentialBackoff(ofSeconds(1), ofSeconds(5))
.retryMax(3)
.doOnRetry { log.error("Retry for {}", it.exception()) }
)
.awaitSingle()
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalStateException("boom"))
.doOnError(e -> {
errorCount.incrementAndGet();
System.out.println(e + " at " + LocalTime.now());
})
.retryWhen(Retry
.backoff(3, Duration.ofMillis(100)).jitter(0d)
.doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries()))
.onRetryExhaustedThrow((spec, rs) -> rs.failure())
);
我们将记录源发出的错误时间并对其进行计数。
我们配置指数退避重试,最多 3 次尝试且无抖动。
我们还记录重试发生的时间,以及重试尝试次数(从 0 开始(。
默认情况下,将引发 Exceptions.retryExhausted 异常,最后一个 failure(( 作为原因。在这里,我们对其进行自定义以直接将原因发出为onError。