使用分区器的弹簧批处理线程



我正在编写一个需要使用线程的 spring 批处理应用程序。我正在使用分区程序。 所有的逻辑,DAO和实体都在一个外部罐子里。

无论如何,我的问题是...我将我的应用程序拆分为 10 个线程。当我查看数据库表BATCH_STEP_EXECUTION时,我所有的分区程序都在那里,但它们不是同时写入的。我不知道这是否正确。

这是我的批量配置

@Bean
public Job incomingJob() {
return jobBuilderFactory.get("JOB_CONCEDER_PERFIL_NORMAL")
.incrementer(new RunIdIncrementer())
.start(partitionerStep()).build();
}
@Bean
public Step partitionerStep() {
return stepBuilderFactory.get("PARTITIONER_STEP")
.partitioner("slaveStep", partitioner())
.step(stepConcederPerfilNormal()).gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step stepConcederPerfilNormal() {
return stepBuilderFactory.get("STEP_CONCEDER_PERFIL").<PerfilProcessorVO, PerfilConcessaoNormalVO>chunk(500)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.skip(NonUniqueObjectException.class)
.skipLimit(1000)
.noRollback(NonUniqueObjectException.class)
.build();
}
@Bean
public PerfilPartitioner partitioner() {
return new PerfilPartitioner();
}
@Bean
@StepScope
public PerfilReader reader() {
return new PerfilReader();
}
@Bean
@StepScope
public PerfilProcessor processor() {
return new PerfilProcessor();
}
@Bean
@StepScope
public PerfilWriter writer() {
return new PerfilWriter();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(20);
executor.setCorePoolSize(10);
executor.setAllowCoreThreadTimeOut(true);
executor.setAwaitTerminationSeconds(100);
return executor;
}

我看到的是,每个线程只是在最后一个线程完成时更新表。

ID  VERSION STEP_NAME                                   JOB_EXECUTION_ID    START_TIME                  END_TIME            STATUS      COMMIT  READ    FILT    write
1   1       STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER6    0                   16/01/20 14:29:51,137000000                     STARTED     0       0       0       0   
2   1       STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER8    0                   16/01/20 14:29:51,138000000                     STARTED     0       0       0       0   
3   1       STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER5    0                   16/01/20 14:29:51,136000000                     STARTED     0       0       0       0   
4   1       STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER4    0                   16/01/20 14:29:51,135000000                     STARTED     0       0       0       0   
5   1       STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER1    0                   16/01/20 14:29:51,138000000                     STARTED     0       0       0       0   
6   1       STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER2    0                   16/01/20 14:29:51,136000000                     STARTED     0       0       0       0   
7   1       STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER3    0                   16/01/20 14:29:51,136000000                     STARTED     0       0       0       0   
8   3       STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER7    0                   16/01/20 14:29:51,139000000 16/01/20 14:34:33   COMPLETED   1       16      0       16  
9   1       STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER9    0                   16/01/20 14:29:51,141000000                     STARTED     0       0       0       0   
10  15      STEP_CONCEDER_PERFIL:PERFIL_PARTITIONER0    0                   16/01/20 14:29:51,135000000                     STARTED     14      16      0       8   
0   1       PARTITIONER_STEP                            0                   16/01/20 14:29:43,813000000                     STARTED     0       0       0       0   

在任务执行器中设置并发限制

@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(20);
executor.setCorePoolSize(10);
executor.setAllowCoreThreadTimeOut(true);
executor.setAwaitTerminationSeconds(100);
executor.setConcurrencyLimit(n)
return executor;
}

最新更新