Spring Boot:我们如何实现多个@Scheduled任务,每个任务都有自己的线程池



我想实现多个@Scheduled(具有固定延迟(任务,每个任务都有自己的线程池。

@Scheduled(fixedDelayString = "30000")
public void createOrderSchedule() {
//create 10 orders concurrently; wait for all to be finished
createOrder(10);
}
@Scheduled(fixedDelayString = "30000")
public void processOrderSchedule() {
//process 10 orders concurrently; wait for all to be finished
}
@Scheduled(fixedDelayString = "30000")
public void notifySchedule() {
//send notification for 10 orders concurrently; wait for all to be finished
}

我设法为每个调度器创建了不同的ThreadPoolTaskExecutor,如下所示:

@Bean("orderPool")
public ThreadPoolTaskExecutor createOrderTaskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);
pool.setMaxPoolSize(10);
pool.setThreadNamePrefix("order-thread-pool-");
pool.setWaitForTasksToCompleteOnShutdown(true);
return pool;
}
..

我为每个任务提供了@Async

@Async("orderPool")
public void createOrder(Integer noOforders) {..}

和任务调度器配置

@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(3);
return threadPoolTaskScheduler;
}

我使用CompletableFuture.allOf(..).join();来等待每个任务完成,但它会每隔一个@Scheduled任务就阻塞一次。

综上所述,我想实现以下目标:

  1. 每个@Scheduled任务都应该独立运行,而不会阻塞其他@Scheduled任务
  2. 每个@Scheduled任务都应该有自己的线程池,这样它就可以同时运行多个子任务(比如10个子任务(
  3. 每个@Scheduled任务必须等待每个触发器完成,而不需要再次调用

如何实现这一点?

在连续做了将近18个小时之后,我能够实现上面问题中提出的要求。抱歉这么晚了。

因此,流API提供了类似IntStream等的接口来并行流式传输元素。这使我能够并行创建n订单。(同时,在不同的调度器中并行处理k订单。依此类推。(

IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index)));

就这么简单。解决了1个用例。现在我希望这个调度器有自己的池。发现IntStream.parallel()使用了ForkJoinPool,它是我们自己的ExecutorService的继任者,令我惊讶的是,spring为ForkJoinPool提供了一个预配置的工厂bean,即ForkJoinPoolFactoryBean。所以我创建了一个名为createOrderExecutor的bean。

@Bean("createOrderExecutor")
public ForkJoinPoolFactoryBean createOrderExecutor() {
ForkJoinPoolFactoryBean createOrderPoolFactoryBean = new ForkJoinPoolFactoryBean();
createOrderPoolFactoryBean.setParallelism(10);
createOrderPoolFactoryBean.setAsyncMode(true);
createOrderPoolFactoryBean.setUncaughtExceptionHandler(null);
createOrderPoolFactoryBean.setThreadFactory(p -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(p);
worker.setName("create-order-pool-" + worker.getPoolIndex());
return worker;
});
return createOrderPoolFactoryBean;
}

我在调度器类中自动连接了这个bean,并同时提交了所有订单,如下所示。

createOrderExecutor.getObject().submit(() -> IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index))));

那里。解决了第二个用例。现在,这不会等待所有并行任务完成,只会异步触发它们。现在,ForkJoinTask(submit()返回(提供了一个get()方法,该方法等待计算完成&返回结果。(但我不需要结果,我宁愿用try-catch包围它们。而且,我只等完成。(

@Scheduled(fixedDelayString = "5000")
public void createOrderScheduler() {
createOrderExecutor.getObject().submit(() -> IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index)))).get();
}

这解决了我的最后一个用例。我为应用程序中的所有调度程序执行了此操作。

相信我,我尝试了CompletableFuture的几乎所有在线实现,但没能实现所有这些。

最新更新