使用cron在Spring中调度周期性反应性任务



通常我会这样做,以安排在给定时区使用cron在Spring中定期执行的作业:

@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
// Do stuff
}

这将阻止调用scheduleStuff的线程,直到作业完成。然而,在这种情况下,我想做的"事情"都是使用Springs的项目reactor的非阻塞构建块(即MonoFlux等(来实现的。

例如,假设我想定期触发此功能:

Flux<Void> stuff() {
return ..
}

当然,我可以简单地调用stuff().subscribe()(甚至stuff().block()(,但这会阻塞线程。对于非阻塞代码,有没有更好的方法可以实现与@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")相同的功能?

我使用的是Spring Boot 2.1。

实际上,subscribe()不会阻塞线程。如果您确实需要的话,您可以调用stuff().subscribeOn(Schedulers.parallel()).subscribe()或其他调度程序来确保执行将在单独的线程中完成

您可以将stuff方法封装在异步方法中

例如:

@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
stuffService.doStuffAsync();
}

使用异步方法的服务

public class StuffService() implements IStuffService {
@Async
public void doStuffAsync() {
// Call and subscribe to your flux method here
}
}

对CCD_ 10的调用将立即返回CCD_。

这里还有一个选项:

public class PeriodicReactiveTasksInSpring implements SmartLifecycle {
private final AtomicReference<Subscription> subscription;
private final Long executionPeriod;
public PeriodicReactiveTasksInSpring(Long executionPeriod) {
this.subscription = new AtomicReference<>();
this.executionPeriod = executionPeriod;
}
@Override
public void start() {
if (Objects.isNull(subscription.get())) {
updateConfig()
.doOnSubscribe(sub -> {
subscription.set(sub);
}).subscribe();
}
}
@Override
public void stop() {
Optional.ofNullable(subscription.get())
.ifPresent(sub -> {
sub.cancel();
subscription.set(null);
});
}
@Override
public boolean isRunning() {
return Objects.nonNull(subscription.get());
}

private Flux<Item> updateConfig() {
return Flux.interval(Duration.ofMillis(executionPeriod))
.subscribeOn(Schedulers.boundedElastic())
.flatMap(cfg -> {
// Do your job here
})
.onErrorContinue((err, msg) -> LOGGER.error("Error: {} message: {}", err, msg));
}
}

最新更新