弹簧反应性重试,有条件地进行指数退避


使用

spring 反应式 WebClient,我使用了一个 API,如果响应状态为 500,我需要以指数退避重试。但是在 Mono 类中,我没有看到任何将谓词作为输入参数的重试回退。

这是我搜索的函数类型:

public final Mono<T> retryBackoff(Predicate<? super Throwable> retryMatcher, long numRetries, Duration firstBackoff)

现在我的实现如下(我没有使用回退机制重试(:

client.sendRequest()
    .retry(e -> ((RestClientException) e).getStatus() == 500)
    .subscribe();

你可能想看看 reactor-adddons 项目中的 reactor-extra 模块。在Maven中,您可以执行以下操作:

<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-extra</artifactId>
    <version>3.2.3.RELEASE</version>
</dependency>

然后像这样使用它:

client.post()
    .syncBody("test")
    .retrieve()
    .bodyToMono(String.class)
    .retryWhen(Retry.onlyIf(ctx -> ctx.exception() instanceof RestClientException)
                    .exponentialBackoff(firstBackoff, maxBackoff)
                    .retryMax(maxRetries))

Retry.onlyIf现已弃用/删除。

如果有人对最新的解决方案感兴趣:

client.post()
      .syncBody("test")
      .retrieve()
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(maxRetries, minBackoff).filter(ctx -> {
          return ctx.exception() instanceof RestClientException && ctx.exception().statusCode == 500; 
      }))

值得一提的是,retryWhen将源异常包装到RetryExhaustedException中。如果要"恢复"源异常,可以使用reactor.core.Exceptions实用程序:

.onErrorResume(throwable -> {
    if (Exceptions.isRetryExhausted(throwable)) {
        throwable = throwable.getCause();
    }
    return Mono.error(throwable);
})

我不确定,您使用的是哪个 Spring 版本,在 2.1.4 中我有这个:

client.post()
    .syncBody("test")
    .retrieve()
    .bodyToMono(String.class)
    .retryBackoff(numretries, firstBackoff, maxBackoff, jitterFactor);

。所以这正是你想要的,对吧?

我目前正在尝试使用 Kotlin 协程 + Spring WebFlux:

似乎以下内容不起作用:

suspend fun ClientResponse.asResponse(): ServerResponse =
    status(statusCode())
        .headers { headerConsumer -> headerConsumer.addAll(headers().asHttpHeaders()) }
        .body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
        .retryWhen { 
            Retry.onlyIf { ctx: RetryContext<Throwable> -> (ctx.exception() as? WebClientResponseException)?.statusCode in retryableErrorCodes }
                .exponentialBackoff(ofSeconds(1), ofSeconds(5))
                .retryMax(3)
                .doOnRetry { log.error("Retry for {}", it.exception()) }
        )
        .awaitSingle()
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalStateException("boom"))
        .doOnError(e -> { 
            errorCount.incrementAndGet();
            System.out.println(e + " at " + LocalTime.now());
        })
        .retryWhen(Retry
                .backoff(3, Duration.ofMillis(100)).jitter(0d) 
                .doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries())) 
                .onRetryExhaustedThrow((spec, rs) -> rs.failure()) 
        );

我们将记录源发出的错误时间并对其进行计数。

我们配置指数退避重试,最多 3 次尝试且无抖动。

我们还记录重试发生的时间,以及重试尝试次数(从 0 开始(。

默认情况下,将引发 Exceptions.retryExhausted 异常,最后一个 failure(( 作为原因。在这里,我们对其进行自定义以直接将原因发出为onError。

相关内容

  • 没有找到相关文章

最新更新