在Spring WebFlux Reactive代码中使用Mono.Retry()实现Retry机制



Am使用Java 8和Spring WebFlux调用外部基于REST的服务器,该服务器也有备份服务器。

目前,如果在调用服务器时出现故障,它会在没有重试机制的情况下访问备份服务器:

public class ServiceImpl {
@Autowired
MyRestClient myRestClient;
@Autowired
MyRestClient myRestClientBackup;
public Mono<ResponseOutput> getResponseOutput(ResponseInput responseInput) {
return Mono.just(responseInput)
.flatMap(input -> {
Mono<ResponseOutput> mono = myRestClient.post(input)
.doOnSuccess(responseOutput -> {
log.info("Successfully got responseOutput={}", responseOutput);
});
return mono;
})
.onErrorResume(e -> {
log.warn("Call to server failed, falling back to backup server...");
Mono<ResponseOutput> mono = myRestClientBackup.post(responseInput)
.doOnSuccess(responseOutput ->
log.info("Successfully got backup responseOutput={}", responseOutput));
return mono;
});
}
}

Am试图实现一种重试机制,在将application.yml中的numRetries属性设置为特定数字时,应该会发生这种情况:

例如,如果numRetries = 2

使用MyRestClient两次命中原始服务器(自numRetries = 2以来(,如果未能命中备份服务器。


这将numRetries配置属性设置为静态值:

application.yml:

client:
numRetries: 2;

application.yml文件加载配置属性的类:

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public ClientConfig {
@Value("${client.numRetries:0}")
private int numRetries;
// Getters and setters omitted for brevity
}

ClientConfigMyRestClient:使用

public class MyRestClient {
@Autowired 
ClientConfig   config;
// Getters and setters omitted for brevity
}

MyRestClient获得numRetries的值后,如何在调用备份服务器之前将ServiceImpl.getResponseOutput()内的原始实现更改为使用numRetries来访问原始服务器?正如您所看到的,这一切最初都是使用Java 8 lambda和流完成的。。。

找到了这个注释@Retryable(value = RestClientException.class),但不知道如何指定else if numRetries is used to call the backup server(伪代码(?

是否有lambda函数表示要在:中使用的numRetries

return Mono.just(responseInput)
.flatMap(input -> {

如何使用Mono.retry()运算符?

此外,此代码中的reactor版本似乎不赞成使用Mono.retry().retryBackOff()方法?

我如何使用引用numRetries(它将是int(的值来指示继续重试(在失败时(,然后在超过numRetries时调用另一个备份服务器.onErrorResume()

使用Mono中的retry(int)运算符,如下所示:

Mono.just(responseInput)
.flatMap(input -> myRestClient.post(input)
.retry(getNumRetries()) // this is the important part
.doOnSuccess(responseOutput -> log.info("Successfully got responseOutput={}", responseOutput)))
.onErrorResume(e -> {
log.warn("Call to server failed, falling back to backup server...");
return myRestClientBackup.post(responseInput)
.doOnSuccess(responseOutput -> log.info("Successfully got backup responseOutput={}", responseOutput));
});

最新更新