如何在春季批处理中创建动态多个作业



我已经创建了多个作业,并且正在循环中运行多个作业。我要求按顺序运行这些作业,但所有步骤的项目读取器都在运行,然后所有作业的项目写入器都在按顺序运行。有没有解决方案可以让一个作业的项目读取器和项目写入器完成后,控制权转到另一个作业

示例代码如下

public void jobprocess() throws Exception {
List<Job> jobList = BatchJob.fetchAllJobs();
for(Job job:jobList){            
jobLauncher.run(job, params);
}
}

public List<Job> fetchAllJobs() throws Exception {
List<Object> list = service.findAll();
List<Job> jobBuilderList = new ArrayList<>();
int i = 0;
for(Object obj:list){

Step step = stepBuilderFactory.get("step"+i)
.<Student, Student>chunk(500)
.reader(ListItemReaderV2(obj))
.processor(Processor)
.writer(Writer)
.build();

jobBuilderList.add(jobBuilderFactory.get("job"+i)
.incrementer(new RunIdIncrementer())
.listener(this)
.start(step)
.build());
i++;

}
return jobBuilderList;
}

您应该构建一个超级作业,将多个作业封装为步骤。通过这种方式,您可以要求Spring批处理按顺序运行步骤(因此是您的作业(。在Spring批处理中,有一个特定的步骤(JobStep(允许这样做:


public void  jobprocess() throws Exception {
jobLauncher.run(superJob(), params);
}
public Job superJob(){
List<Step> stepList = BatchJob.fetchAllJobs().stream()
.map(j-> stepBuilderFactory.get(j.getName())
.job(j)
.build()
).collect(Collectors.toList());
SimpleJobBuilder simpleJobBuilder=jobBuilderFactory.get("superJob")
.start(stepList.get(0));
for(int i = 1; i < stepList.size() ; i++){
simpleJobBuilder.next(stepList.get(i))
}
return simpleJobBuilder.build();
}

对老式的循环编码来说很遗憾:这是我所看到的排除stepList的第一项的最简单的方法。注意,我使用了一个默认行为,将父作业参数注入到每个JobStep中(参见。https://docs.spring.io/spring-batch/docs/4.2.x/api/org/springframework/batch/core/step/job/JobStep.html#setJobParametersExtractor-org.springframework.batch.core.step.job.JobParametersExtractor-(

经过大量实验,我发现这个问题是由于在作业类中使用了ListItemReader((方法造成的。

ListItemReader<Object> listItemReader(object obj) {
return new ListItemReader<>(service.getList()); }

在上面的实现中,listItemReader((方法甚至在作业开始之前就在调用,而作业是在执行listItemRead((方法之后开始的。

现在,我通过实现ItemReader接口创建了一个单独的阅读器类,现在我不返回ListItemReader,而是返回Only ItemReader。通过实现这一点,我的工作正在按预期运行。

阅读器类的伪代码为:

public class ReaderClass implements ItemReader<Object> {
final private DataService dataService;
final private entity entity;
private List<Object> responseList;
private int count = 0;
@Override
public Product read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if(responseList==null){
responseList = DataService.fetchData(entity);
}        
Object obj = null;
if (count < responseList.size()) {
obj = responseList.get(count);
count++;
}
return obj;
}
}

相关内容

  • 没有找到相关文章

最新更新