我想实现多个@Scheduled(具有固定延迟(任务,每个任务都有自己的线程池。
@Scheduled(fixedDelayString = "30000")
public void createOrderSchedule() {
//create 10 orders concurrently; wait for all to be finished
createOrder(10);
}
@Scheduled(fixedDelayString = "30000")
public void processOrderSchedule() {
//process 10 orders concurrently; wait for all to be finished
}
@Scheduled(fixedDelayString = "30000")
public void notifySchedule() {
//send notification for 10 orders concurrently; wait for all to be finished
}
我设法为每个调度器创建了不同的ThreadPoolTaskExecutor
,如下所示:
@Bean("orderPool")
public ThreadPoolTaskExecutor createOrderTaskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);
pool.setMaxPoolSize(10);
pool.setThreadNamePrefix("order-thread-pool-");
pool.setWaitForTasksToCompleteOnShutdown(true);
return pool;
}
..
我为每个任务提供了@Async
。
@Async("orderPool")
public void createOrder(Integer noOforders) {..}
和任务调度器配置
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(3);
return threadPoolTaskScheduler;
}
我使用CompletableFuture.allOf(..).join();
来等待每个任务完成,但它会每隔一个@Scheduled
任务就阻塞一次。
综上所述,我想实现以下目标:
- 每个
@Scheduled
任务都应该独立运行,而不会阻塞其他@Scheduled
任务 - 每个
@Scheduled
任务都应该有自己的线程池,这样它就可以同时运行多个子任务(比如10个子任务( - 每个
@Scheduled
任务必须等待每个触发器完成,而不需要再次调用
如何实现这一点?
在连续做了将近18个小时之后,我能够实现上面问题中提出的要求。抱歉这么晚了。
因此,流API提供了类似IntStream
等的接口来并行流式传输元素。这使我能够并行创建n
订单。(同时,在不同的调度器中并行处理k
订单。依此类推。(
IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index)));
就这么简单。解决了1个用例。现在我希望这个调度器有自己的池。发现IntStream.parallel()
使用了ForkJoinPool
,它是我们自己的ExecutorService
的继任者,令我惊讶的是,spring为ForkJoinPool
提供了一个预配置的工厂bean,即ForkJoinPoolFactoryBean
。所以我创建了一个名为createOrderExecutor
的bean。
@Bean("createOrderExecutor")
public ForkJoinPoolFactoryBean createOrderExecutor() {
ForkJoinPoolFactoryBean createOrderPoolFactoryBean = new ForkJoinPoolFactoryBean();
createOrderPoolFactoryBean.setParallelism(10);
createOrderPoolFactoryBean.setAsyncMode(true);
createOrderPoolFactoryBean.setUncaughtExceptionHandler(null);
createOrderPoolFactoryBean.setThreadFactory(p -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(p);
worker.setName("create-order-pool-" + worker.getPoolIndex());
return worker;
});
return createOrderPoolFactoryBean;
}
我在调度器类中自动连接了这个bean,并同时提交了所有订单,如下所示。
createOrderExecutor.getObject().submit(() -> IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index))));
那里。解决了第二个用例。现在,这不会等待所有并行任务完成,只会异步触发它们。现在,ForkJoinTask
(submit()
返回(提供了一个get()
方法,该方法等待计算完成&返回结果。(但我不需要结果,我宁愿用try-catch
包围它们。而且,我只等完成。(
@Scheduled(fixedDelayString = "5000")
public void createOrderScheduler() {
createOrderExecutor.getObject().submit(() -> IntStream.range(0, inputIds.size())
.parallel().forEach(index -> createOrder(inputIds.get(index)))).get();
}
这解决了我的最后一个用例。我为应用程序中的所有调度程序执行了此操作。
相信我,我尝试了CompletableFuture
的几乎所有在线实现,但没能实现所有这些。