我正在编写一个需要使用线程的 spring 批处理应用程序。我正在使用分区程序。 所有的逻辑,DAO和实体都在一个外部罐子里。
无论如何,我的问题是...我将我的应用程序拆分为 10 个线程。当我查看数据库表BATCH_STEP_EXECUTION时,我所有的分区程序都在那里,但它们不是同时写入的。我不知道这是否正确。
这是我的批量配置
@Bean
public Job incomingJob() {
return jobBuilderFactory.get("JOB_CONCEDER_PERFIL_NORMAL")
.incrementer(new RunIdIncrementer())
.start(partitionerStep()).build();
}
@Bean
public Step partitionerStep() {
return stepBuilderFactory.get("PARTITIONER_STEP")
.partitioner("slaveStep", partitioner())
.step(stepConcederPerfilNormal()).gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step stepConcederPerfilNormal() {
return stepBuilderFactory.get("STEP_CONCEDER_PERFIL").<PerfilProcessorVO, PerfilConcessaoNormalVO>chunk(500)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.skip(NonUniqueObjectException.class)
.skipLimit(1000)
.noRollback(NonUniqueObjectException.class)
.build();
}
@Bean
public PerfilPartitioner partitioner() {
return new PerfilPartitioner();
}
@Bean
@StepScope
public PerfilReader reader() {
return new PerfilReader();
}
@Bean
@StepScope
public PerfilProcessor processor() {
return new PerfilProcessor();
}
@Bean
@StepScope
public PerfilWriter writer() {
return new PerfilWriter();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(20);
executor.setCorePoolSize(10);
executor.setAllowCoreThreadTimeOut(true);
executor.setAwaitTerminationSeconds(100);
return executor;
}
我看到的是,每个线程只是在最后一个线程完成时更新表。
ID VERSION STEP_NAME JOB_EXECUTION_ID START_TIME END_TIME STATUS COMMIT READ FILT write
1 1 STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER6 0 16/01/20 14:29:51,137000000 STARTED 0 0 0 0
2 1 STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER8 0 16/01/20 14:29:51,138000000 STARTED 0 0 0 0
3 1 STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER5 0 16/01/20 14:29:51,136000000 STARTED 0 0 0 0
4 1 STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER4 0 16/01/20 14:29:51,135000000 STARTED 0 0 0 0
5 1 STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER1 0 16/01/20 14:29:51,138000000 STARTED 0 0 0 0
6 1 STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER2 0 16/01/20 14:29:51,136000000 STARTED 0 0 0 0
7 1 STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER3 0 16/01/20 14:29:51,136000000 STARTED 0 0 0 0
8 3 STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER7 0 16/01/20 14:29:51,139000000 16/01/20 14:34:33 COMPLETED 1 16 0 16
9 1 STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER9 0 16/01/20 14:29:51,141000000 STARTED 0 0 0 0
10 15 STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER0 0 16/01/20 14:29:51,135000000 STARTED 14 16 0 8
0 1 PARTITIONER_STEP 0 16/01/20 14:29:43,813000000 STARTED 0 0 0 0
在任务执行器中设置并发限制
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(20);
executor.setCorePoolSize(10);
executor.setAllowCoreThreadTimeOut(true);
executor.setAwaitTerminationSeconds(100);
executor.setConcurrencyLimit(n)
return executor;
}