迭代器项读取器的Spring批处理多线程问题



我是春季批处理的新手,仍在学习,我有IteratorItemReader,自定义处理器和自定义写入器的批量配置,如下所示,

@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Value("${inputFile.location}")
private String inputFile;
@Bean
public Job testJob() throws IOException {
return jobBuilderFactory.get("testJob")
.incrementer(new RunIdIncrementer())
.start(testStep())
.listener(new JobListener())
.build();
}
@Bean
public Step testStep() throws IOException {
return stepBuilderFactory.get("testStep")
.<File, File>chunk(1)
.reader(testReader())
.processor(testProcessor())
.writer(testWriter())
.taskExecutor(threadPoolTaskExecutor())
.build();
}
@Bean
public ItemReader<File> testReader() throws IOException {
List<File> files = Files.walk(Paths.get(inputFile), 1)
.filter(Files::isRegularFile)
.map(Path::toFile)
.collect(Collectors.toList());
return new IteratorItemReader<>(files);
}
@Bean
public CustomProcessor testProcessor() {
return new CustomProcessor();
}
@Bean
public CustomWriter testWriter() {
return new CustomWriter();
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(6);
executor.setQueueCapacity(4);
executor.initialize();
return executor;
}

这里testReader()将检查给定的输入路径并将所有文件列出到list中,然后它返回IteratorItemReader,然后在处理器中发生业务逻辑。

如果在输入位置有多个文件(不止一个),一切都很好,我没有得到任何错误,但是,

问题陈述:假设在输入位置只有一个文件(例如:C:/User/documents/abc.txt),一个线程将完全处理文件,一切正常,但最终我得到以下异常,

ERROR - Encountered an error executing step testStep in job testJob
java.util.NoSuchElementException: null
at java.util.ArrayList$Itr.next(ArrayList.java:864)
at org.springframework.batch.item.support.IteratorItemReader.read  (IteratorItemReader.java:70)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead (SimpleChunk Provider.java:99)
at org.springframework.batch.core.step.item.SimpleChunkProvider.read (SimpleChunkProvider.java:180)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration (SimpleChunk Provider.java:126)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult (RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal (RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.item.SimpleChunk Provider.provide (SimpleChunkProvider.java:118)
at org.springframework.batch.core.step.item. ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction (TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support. Transaction Template.execute(Transaction Template.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext (TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration (StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run (TaskExecutorRepeatTemplate.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

这个异常发生是因为多线程,当我试图查看IteratorItemReader类行号70我在下面的代码中找到了

if (iterator.hasNext())
return iterator.next();
else
return null; // end of data

解决这个问题的最佳方案是什么?请提供您的意见,

提前感谢。

任何建议都会有帮助的。

直接将其包装在SynchronizedItemStreamReader中不起作用,因为这需要ItemStreamReader,但IteratorItemReader不是ItemStreamReader。我认为最好的办法是创建一个自定义的SynchronizedIteratorItemReader。它将与IteratorItemReader完全相同,只是read()方法应该被标记为synchronized。

最新更新