为taskExecutor的每个线程运行read,以便在多线程步骤中获得重复的数据



创建一个作业,从大查询中获取数据并处理它。我的方法是在读取器中获取数据,然后在块中运行它,并使用任务执行器在不同的线程中运行块。

TripDateTimeDecider用于决定查询将在reader中运行的范围。TransactionReader用于执行查询以加载数据。TransactionProcessor用于处理加载的数据。TransactionWriter用于将数据写入表。

Flow I want: TripDateTimeDecider ->TransactionReader(从大查询表中获取数据)->为TransactionProcessor和TransactionWriter运行指定chunk的线程。

但是我得到:tripdatetimedecision ->多线程TransactionReader读取相同的数据->为TransactionProcessor和TransactionWriter运行具有相同数据的线程。

- 2023-04-11 12:50:57.456 [taskExecutor-3] INFO  c.q.p.p.steps.TransactionReader - TransactionReader::read() for tripStartDateTime=  2022-03-01T00:00:00  and tripIntervalDateTime= 2022-03-01T06:00:00.0
- 2023-04-11 12:51:01.286 [taskExecutor-3] INFO  c.q.p.p.utils.BigQuerySalesTransUtil - loadTransactionsFromURT for trip_start_date_time=2022-03-01T00:00:00  , tripIntervalDateTime= 2022-03-01T06:00:00.0 and currentEnv = dev
- 2023-04-11 12:51:01.287 [taskExecutor-4] INFO  c.q.p.p.steps.TransactionReader - TransactionReader::read() for tripStartDateTime=  2022-03-01T00:00:00  and tripIntervalDateTime= 2022-03-01T06:00:00.0
- 2023-04-11 12:51:01.287 [taskExecutor-4] INFO  c.q.p.p.utils.BigQuerySalesTransUtil - loadTransactionsFromURT for trip_start_date_time=2022-03-01T00:00:00  , tripIntervalDateTime= 2022-03-01T06:00:00.0 and currentEnv = dev
- 2023-04-11 12:51:04.792 [taskExecutor-2] INFO  c.q.p.p.steps.TransactionReader - TransactionReader::read() for tripStartDateTime=  2022-03-01T00:00:00  and tripIntervalDateTime= 2022-03-01T06:00:00.0
- 2023-04-11 12:51:04.792 [taskExecutor-2] INFO  c.q.p.p.utils.BigQuerySalesTransUtil - loadTransactionsFromURT for trip_start_date_time=2022-03-01T00:00:00  , tripIntervalDateTime= 2022-03-01T06:00:00.0 and currentEnv = dev
- 2023-04-11 12:51:04.792 [taskExecutor-1] INFO  c.q.p.p.steps.TransactionReader - TransactionReader::read() for tripStartDateTime=  2022-03-01T00:00:00  and tripIntervalDateTime= 2022-03-01T06:00:00.0
- 2023-04-11 12:51:04.792 [taskExecutor-1] INFO  c.q.p.p.utils.BigQuerySalesTransUtil - loadTransactionsFromURT for trip_start_date_time=2022-03-01T00:00:00  , tripIntervalDateTime= 2022-03-01T06:00:00.0 and currentEnv = dev




@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
public class ReceiptScanningMicroBlinkJobConfig {

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;

@Autowired
private TripDateTimeDecider tripDateTimeDecider;

@Autowired
private MicroBlinkJobInitTasklet microBlinkJobInitTasklet;

@Autowired
private MicroBlinkJobEndTasklet microBlinkJobEndTasklet;

@Autowired
private StepBuilderFactory stepBuilderFactory;

private static final String WILL_BE_INJECTED = null;

@Bean
@StepScope
public ItemReader<TransactionReceiptScanRequest> transactionReader(@Value("#{jobExecutionContext['trip_start_date_time']}") String tripStartDateTime,
     @Value("#{jobExecutionContext['trip_interval_date_time']}") String tripIntervalDateTime,
     @Value("#{jobExecutionContext['interval_hours']}") String intervalHours,
     @Value("#{jobExecutionContext['ignored_status_code']}") String ignoredStatusCode) {
return new TransactionReader(tripStartDateTime, tripIntervalDateTime, intervalHours, ignoredStatusCode);
}

@Bean
@StepScope
public ItemProcessor<TransactionReceiptScanRequest, TransactionReceiptScanRequest> transactionProcessor() {
return new TransactionProcessor();
}

@Bean
@StepScope
public ItemWriter<TransactionReceiptScanRequest> transactionWriter() {
return new TransactionWriter();
}

@Bean
protected Step processLines() {
return steps.get("processEntities").<TransactionReceiptScanRequest, TransactionReceiptScanRequest> chunk(10)
.reader(transactionReader(WILL_BE_INJECTED,WILL_BE_INJECTED,WILL_BE_INJECTED,WILL_BE_INJECTED))
.processor(transactionProcessor())
.writer(transactionWriter())
.taskExecutor(taskExecutor())
.build();
}


@Bean
public Job job() {

Flow flow = new FlowBuilder<SimpleFlow>("Job")
.next(tripDateTimeDecider)
.on(Constants.COMPLETED)
.end()
.from(tripDateTimeDecider)
.on(Constants.CONTINUE)
.to(initJobExecutionStep())
.next(processLines())
.next(endJobExecutionStep())
.next(tripDateTimeDecider)
.on(Constants.COMPLETED)
.end()
.build();

return jobs.get("Job")
.incrementer(new RunIdIncrementer())
.listener(new DefaultJobListener())
.start(flow)
.end()
.build();
}

// start -> Init tasklet to get max trip date and put in context
//startdate and endDate to reader
// only columns

@Bean
public Step initJobExecutionStep() {
return stepBuilderFactory
.get("microBlinkJobInitTasklet")
.tasklet(microBlinkJobInitTasklet)
.build();
}

@Bean
public Step endJobExecutionStep() {
return stepBuilderFactory
.get("microBlinkJobEndTasklet")
.tasklet(microBlinkJobEndTasklet)
.build();
}

@Bean
public TaskExecutor taskExecutor(){
ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
threadPoolExecutor.setCorePoolSize(5);
threadPoolExecutor.setMaxPoolSize(5);
threadPoolExecutor.setQueueCapacity(10);
// multiple instances jobs 5.5 Million ->63 days
return threadPoolExecutor;
}
}


the above is the batch job configuration.
`        refer from 
https://examples.javacodegeeks.com/java-development/enterprise-java/spring/batch/spring-batch-multithreading-example/                  I want to run the reader once and then processor and writer should run in multiple threads based on chunk provided`

这意味着您的项目读取器不是线程安全的。您需要通过将阅读器包装在SynchronizedIteamStreamReader中来同步阅读器。

另一种选择是将输入划分为不同的分区,并使用多个线程并发地处理分区。在这种情况下,每个线程将顺序地从分配给它的分区中读取项。

最新更新