我使用Flux.interval
来运行计划任务
Flux
.interval(Duration.ofSeconds(30))
.doOnNext(duration -> log.info("Process has been started"))
.flatMap(duration -> customService.process())
.onErrorResume(throwable -> Flux.empty())
.subscribe();
这里的事情,如果一个异常发生在任何地方,onErrorResume
已被调用,但调度程序停止工作-它不再调用customService.process()
我需要这个继续调用方法每30秒即使有任何错误在任何地方。有人能帮我吗?
在您的情况下,.onErrorResume(throwable -> Flux.empty())
将完成没有错误信号的流程,但原始Flux
将被取消。
你有几个选项:
处理错误
处理process
级别的错误。在这种情况下,错误将被"忽略"。初始Flux
将继续。从技术上讲,这与在命令式编程中将process
方法包装在try-catch
中并忽略错误相同。
Flux.interval(Duration.ofSeconds(30))
.doOnNext(i -> log.info("Process has been started: {}", i))
.flatMap(i ->
process(i)
.doOnError(e -> log.error("Error: {}", e.getMessage()))
.onErrorResume(e -> Mono.empty())
);
Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3
Error: oops 3
Process has been started: 4
Process has been started: 5
Process has been started: 6
Process has been started: 7
你可以在序列的末尾使用.retry()
操作符来重试任何错误,或者如果你需要更细粒度的控制,可以使用.retryWhen
。
Flux.interval(Duration.ofSeconds(30))
.doOnNext(i -> log.info("Process has been started: {}", i))
.flatMap(i ->
process(i)
)
.retry();
请注意,在这种情况下,每个错误都将触发重新订阅Flux.interval
,并且序列将重新开始。
Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3
Error: oops 3
Process has been started: 0
Process has been started: 1
Process has been started: 2
Process has been started: 3