我在春季批处理应用程序中有一个工作成功完成,但完成后,其线程进入等待状态。随着越来越多的job实例被执行,tomcat的线程数不断增加。
下面是作业配置:
@Bean
@Scope("singleton")
@Qualifier(value = "job1")
public Job job() {
return jobBuilderFactory.get("job1")
.incrementer(new RunIdIncrementer())
.flow(step1())
.end()
.preventRestart()
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Buylead, Buylead>chunk(10000)
.reader(itemReader())
.writer(itemWriter())
.build();
}
public JdbcCursorItemReader<Buylead> itemReader() {
JdbcCursorItemReader<Buylead> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql(" --- WHatever Query ----");
reader.setRowMapper(new MapperBL());
return reader;
}
public ItemWriter<Buylead> itemWriter(){
FlatFileItemWriter<Buylead> itemWriter = new FlatFileItemWriter() ;
itemWriter.setResource(new FileSystemResource("output/buyleadSolrDoc.xls"));
itemWriter.setLineAggregator(new DelimitedLineAggregator<Buylead>() { {
setFieldExtractor(new BeanWrapperFieldExtractor<Buylead>() { {
setNames(new String[] {*************** }); } }); } });
itemWriter.setShouldDeleteIfEmpty(true);
return itemWriter;
}
数据源保存在另一个配置类中。
和这里是Main类
@SpringBootApplication
@EnableBatchProcessing
public class Application extends DefaultBatchConfigurer{
public static void main(String[] args) {
SpringApplication.run(SearchApplication.class, args);
}
@Override
public void setDataSource(DataSource dataSource) {
// override to do not set datasource even if a datasource exist.
// initialize will use a Map based JobRepository (instead of database)
}
}
作业正在成功执行。但是它的线程进入等待状态。
和这里是应用程序的ThreadDump部分
http-nio-8080-exec-2" #22 daemon prio=5 os_prio=0 cpu=0.11ms elapsed=29.43s tid=0x00007fcb6d6ddd60 nid=0xc724 waiting on condition [0x00007fcae3ffe000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@15.0.1/Native Method)
- parking to wait for <0x0000000713c00710> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(java.base@15.0.1/LockSupport.java:341)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@15.0.1/AbstractQueuedSynchronizer.java:505)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@15.0.1/ForkJoinPool.java:3137)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@15.0.1/AbstractQueuedSynchronizer.java:1614)
at java.util.concurrent.LinkedBlockingQueue.take(java.base@15.0.1/LinkedBlockingQueue.java:435)
at org.apache.tomcat.util.threads.TaskQueue.take(TaskQueue.java:108)
at org.apache.tomcat.util.threads.TaskQueue.take(TaskQueue.java:33)
at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@15.0.1/ThreadPoolExecutor.java:1056)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@15.0.1/ThreadPoolExecutor.java:1116)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@15.0.1/ThreadPoolExecutor.java:630)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(java.base@15.0.1/Thread.java:832)*
请建议如何关闭处于Park状态的线程
Spring Batch不直接创建或管理线程。它将其委托给不同位置的TaskExecutor
实现:
JobLauncher
委托任务执行器启动作业StepBuilder
委托任务执行器创建多线程步骤FlowBuilder
委托任务执行器创建拆分流并并行运行步骤AsyncItemProcessor
委托任务执行器在单独的线程中异步处理项TaskExecutorPartitionHandler
委托给任务执行器来处理不同线程中的分区 等
在所有这些情况下,线程生命周期是由底层TaskExecutor
实现管理的。如果您在Spring Batch中使用任务执行器,则需要确保在不再需要它时正确地停止/关闭它。从你分享的内容来看,你似乎没有在Spring Batch配置中使用任务执行器,所以你应该在应用程序上下文的某个地方定义一个没有正确停止的任务执行器。