我想从输入文件(CSV)预先加载一些参考数据作为初始步骤之一(预努力引用)。以下步骤将读取主要输入文件,其中需要处理的数据。要处理的每个记录都需要在预加载引用步骤中查找组数据。如何才能做到这一点?!可以将参考数据与数据记录一起传递到主处理器例程中?
- 我们是否可以假设您需要一个有两个步骤的作业,并且第一步将数据传递到第二步?如果是这种情况,您可以尝试:
- 将数据存储到第1步的
ExecutionContext
中。 - 在
ExecutionContextPromotionListener
的帮助下,将这些数据促进了下一步。 - 将数据消耗到第2步中。
我会以一个非常幼稚的示例来备份我的话,部分受到正式文档的启发。
在第一步中,我们正在"读取"值:
class Step1Reader implements ItemReader<String> {
private AtomicLong counter = new AtomicLong();
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
long cnt = counter.incrementAndGet();
return cnt > 5 ? null : String.valueOf(cnt);
}
}
然后,通过在末期将当前时间附加到末期:
来"处理"它们class Step1Processor implements ItemProcessor<String, String> {
@Override
public String process(String item) throws Exception {
return item + "~" + System.currentTimeMillis();
}
}
最终将它们输出到标准输出中,并在上下文中存储一些垃圾(我将忽略多线程问题):
class Step1Writer implements ItemWriter<String> {
private StepExecution stepExecution;
@Override
public void write(List<? extends String> items) throws Exception {
items.forEach(System.out::println);
Integer hashCode = (Integer)stepExecution.getExecutionContext().get("key");
stepExecution.getExecutionContext().put("key", hashCode == null ? items.hashCode() : hashCode + items.hashCode());
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
这应该输出以下内容:
1~1505055135727
2~1505055135727
3~1505055135727
4~1505055135727
5~1505055135727
到目前为止一切都很好。下一步是"促进"密钥:
@Bean
ExecutionContextPromotionListener executionContextPromotionListener()
{
ExecutionContextPromotionListener ret = new ExecutionContextPromotionListener();
ret.setKeys(new String[]{"key"});
return ret;
}
private Step step1() {
return stepBuilderFactory.get("step1")
.<String, String> chunk(10)
.reader(new Step1Reader())
.processor(new Step1Processor())
.writer(new Step1Writer())
.listener(promotionListener)
.build();
}
在第二个甚至更幼稚的步骤中,可以像这样访问该值:
class Step2Writer implements ItemWriter<String> {
private Object obj;
@Override
public void write(List<? extends String> items) throws Exception {
items.forEach(s -> System.out.println(s + "~" + obj));
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.obj = jobContext.get("key");
}
}
希望有帮助: - )
不建议将大量数据存储到StepExecutionContext
中,因为此上下文已序列化到SB元数据表,如果数据太大,您将获得数据截断错误。您可以使用特定的DAO访问此数据并将访问请求放在缓存下:无需减慢预加载和记忆消耗将使您的工作变得符合您的工作。您还将获得重新启动性。