发生错误时,通量间隔停止执行



我使用Flux.interval来运行计划任务

Flux
.interval(Duration.ofSeconds(30))
.doOnNext(duration -> log.info("Process has been started"))
.flatMap(duration -> customService.process())
.onErrorResume(throwable -> Flux.empty())
.subscribe();

这里的事情,如果一个异常发生在任何地方,onErrorResume已被调用,但调度程序停止工作-它不再调用customService.process()

我需要这个继续调用方法每30秒即使有任何错误在任何地方。有人能帮我吗?

在您的情况下,.onErrorResume(throwable -> Flux.empty())将完成没有错误信号的流程,但原始Flux将被取消。

你有几个选项:

处理错误

处理process级别的错误。在这种情况下,错误将被"忽略"。初始Flux将继续。从技术上讲,这与在命令式编程中将process方法包装在try-catch中并忽略错误相同。

Flux.interval(Duration.ofSeconds(30))
.doOnNext(i -> log.info("Process has been started: {}", i))
.flatMap(i ->
process(i)
.doOnError(e -> log.error("Error: {}", e.getMessage()))
.onErrorResume(e -> Mono.empty())
);
Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3
Error: oops 3
Process has been started: 4
Process has been started: 5
Process has been started: 6
Process has been started: 7

你可以在序列的末尾使用.retry()操作符来重试任何错误,或者如果你需要更细粒度的控制,可以使用.retryWhen

Flux.interval(Duration.ofSeconds(30))
.doOnNext(i -> log.info("Process has been started: {}", i))
.flatMap(i ->
process(i)
)
.retry();

请注意,在这种情况下,每个错误都将触发重新订阅Flux.interval,并且序列将重新开始。

Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3
Error: oops 3
Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3

最新更新