为什么我在尝试启动多个 Spring 批处理作业时会收到此错误?豆子"工作启动器"....无法注册



我正在研究一个Spring批处理应用程序,其中包含两个不同的作业Bean(代表两个不同的工作)。这两个作业都必须由我的应用程序执行(目前它既可以按顺序执行,也可以并行执行)。

现在不是很重要。为了实现这个行为,我试图遵循这个文档,但我发现几个问题:https://newbedev.com/spring-batch-running-multiple-jobs-in-parallel

我会尽量解释我的情况和遇到的问题:

首先,我有这个配置类,其中我的两个作业对象(以及相关步骤)声明:

@Configuration
public class UpdateInfoBatchConfig {


private static final String PROPERTY_REST_API_URL = "rest.api.url";

@Autowired
private NotaryListServiceAdapter notaryListServiceAdapter;

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;

@Autowired
private NotaryService notaryService;


@Bean("firstStepItemReader")
public ItemReader<NotaryDistrict> itemReader(Environment environment, RestTemplate restTemplate) throws IllegalStateException, URISyntaxException {

System.out.println("itemReader() START !!!");

return new RESTNotaryDistrictsReader();     
}

@Bean("firstStepItemWriter")
public ItemWriter<NotaryDistrict> itemWriter() {
return new LoggingItemWriter();
}

@Bean("secondStepItemReader")
public ItemReader<NotaryDistrict> secondStepReader(Environment environment) throws IllegalStateException {

System.out.println("secondStepItemReader() creation !!!");

return new SecondStepItemReader();  
}

@Bean("secondStepItemProcessor")
public ItemProcessor<NotaryDistrict, NotaryDistrict> secondStepItemProcessor() {
return new SecondStepItemProcessor();
}

@Bean("secondStepItemWriter")
public ItemWriter<NotaryDistrict> secondStepItemWriter() {
return new SecondStepItemWriter();
}


/**
************************************ UPDATE NOTARY DISTRICTS LIST JOB SECTION ********************************************
*/
/**
* Creates a bean that represents the first step of the batch.
* How it works:
* 1) Call an external API in order to retrieve notary districts list
* 2) Return notary district one by one to the second step
* @param reader a custom reader calling an external API
* @param writer
* @param stepBuilderFactory
* @return
*/
@Bean("firstStep")
public Step firstStep(@Qualifier("firstStepItemReader") ItemReader<NotaryDistrict> reader,
@Qualifier("firstStepItemWriter") ItemWriter<NotaryDistrict> writer,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("updateNotaryDistrictsStep")
.<NotaryDistrict, NotaryDistrict>chunk(1)
.reader(reader)
.writer(writer)
.build();
}

@Bean("secondStep")
public Step secondStep(@Qualifier("secondStepItemReader") ItemReader<NotaryDistrict> secondStepItemReader,
@Qualifier("secondStepItemProcessor") ItemProcessor<NotaryDistrict, NotaryDistrict> secondStepItemProcessor,
@Qualifier("secondStepItemWriter") ItemWriter<NotaryDistrict> secondStepItemWriter,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("secondStep")
.<NotaryDistrict, NotaryDistrict>chunk(1)
.reader(secondStepItemReader)
.processor(secondStepItemProcessor)
.writer(secondStepItemWriter)
.build();
}

@Bean("updateNotaryDistrictsJob")
public Job updateNotaryDistrictsJob(JobBuilderFactory jobBuilderFactory,
@Qualifier("firstStep") Step firstStep,
@Qualifier("secondStep") Step secondStep) {
return jobBuilderFactory.get("updateNotaryDistrictsJob")
.start(firstStep)
.next(secondStep)
//.next(playerSummarization())
.build();
}

@Bean
public ExecutionContext executionContext() {
return new ExecutionContext();

}

/**
************************************ UPDATE NOTARY LIST JOB SECTION ********************************************
*/
@Bean()
public ItemReaderAdapter serviceItemReader() {
ItemReaderAdapter reader = new ItemReaderAdapter();
reader.setTargetObject(notaryListServiceAdapter);
reader.setTargetMethod("nextNotaryElement");
return reader;

}

@Bean
public Step readNotaryListStep(){
return steps.get("readNotaryListStep").
<Integer,Integer>chunk(1)  
.reader(serviceItemReader())
.processor(new NotaryDetailsEnrichProcessor(notaryService))
.writer(new ConsoleItemWriter())
.build();
}

@Bean("updateNotaryListInfoJob")
public Job updateNotaryListInfoJob(){
return jobs.get("updateNotaryListInfoJob")
.incrementer(new RunIdIncrementer())
.start(readNotaryListStep())
.build();
}

}

然后,在第一时间,我创建了这个SpringBatchExampleJobLauncher发射器类。这工作得很好,最初用于启动单个作业(我想我必须更改这个启动器类的逻辑,以便执行两个作业而不是单个作业):

public class SpringBatchExampleJobLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringBatchExampleJobLauncher.class);
private final Job job;
private final JobLauncher jobLauncher;
private ExecutionContext executionContext;
@Autowired
public SpringBatchExampleJobLauncher(@Qualifier("updateNotaryDistrictsJob") Job job, 
JobLauncher jobLauncher,
ExecutionContext executionContext) {
this.job = job;
this.jobLauncher = jobLauncher;
this.executionContext = executionContext;
}
//@Scheduled(cron = "0 */3 * * * *")
@Scheduled(cron = "0/30 * * * * *")
public void runSpringBatchExampleJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
LOGGER.info("Spring Batch example job was started");

List<NotaryDistrict> notaryDistrictsList = new ArrayList<NotaryDistrict>();
executionContext.put("notaryDistrictsList", notaryDistrictsList);

jobLauncher.run(job, newExecution());
LOGGER.info("Spring Batch example job was stopped");
}
private JobParameters newExecution() {
Map<String, JobParameter> parameters = new HashMap<>();
JobParameter parameter = new JobParameter(new Date());
parameters.put("currentTime", parameter);
return new JobParameters(parameters);
}
}
如您所见,这个类非常简单:它的构造函数接受一个特定的作业(由@Qualifier标识)。(定义在前面的配置类中),JobLauncher以便运行dis job和ExecutionContext。

然后它包含runSpringBatchExampleJob()每30秒运行一次这个作业(由CRON异常指定)。

好吧…所以,为了启动我的两个工作,我认为我需要改变这个SpringBatchExampleJobLauncher类似如下所示:https://newbedev.com/spring-batch-running-multiple-jobs-in-parallel

这就是我所做的。首先,我添加了ThreadPoolTaskExecutorJobLauncher在我的UpdateInfoBatchConfig中定义bean配置类,变成如下:

@Configuration
public class UpdateInfoBatchConfig {


private static final String PROPERTY_REST_API_URL = "rest.api.url";

@Autowired
private NotaryListServiceAdapter notaryListServiceAdapter;

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;

@Autowired
private NotaryService notaryService;


@Bean("firstStepItemReader")
public ItemReader<NotaryDistrict> itemReader(Environment environment, RestTemplate restTemplate) throws IllegalStateException, URISyntaxException {

System.out.println("itemReader() START !!!");

return new RESTNotaryDistrictsReader();

}

@Bean("firstStepItemWriter")
public ItemWriter<NotaryDistrict> itemWriter() {
return new LoggingItemWriter();
}

@Bean("secondStepItemReader")
public ItemReader<NotaryDistrict> secondStepReader(Environment environment) throws IllegalStateException {

System.out.println("secondStepItemReader() creation !!!");

return new SecondStepItemReader();

}

@Bean("secondStepItemProcessor")
public ItemProcessor<NotaryDistrict, NotaryDistrict> secondStepItemProcessor() {
return new SecondStepItemProcessor();
}

@Bean("secondStepItemWriter")
public ItemWriter<NotaryDistrict> secondStepItemWriter() {
return new SecondStepItemWriter();
}
/**
* Creates a bean that represents the first step of the batch.
* How it works:
* 1) Call an external API in order to retrieve notary districts list
* 2) Return notary district one by one to the second step
* @param reader a custom reader calling an external API
* @param writer
* @param stepBuilderFactory
* @return
*/
@Bean("firstStep")
public Step firstStep(@Qualifier("firstStepItemReader") ItemReader<NotaryDistrict> reader,
@Qualifier("firstStepItemWriter") ItemWriter<NotaryDistrict> writer,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("updateNotaryDistrictsStep")
.<NotaryDistrict, NotaryDistrict>chunk(1)
.reader(reader)
.writer(writer)
.build();
}

@Bean("secondStep")
public Step secondStep(@Qualifier("secondStepItemReader") ItemReader<NotaryDistrict> secondStepItemReader,
@Qualifier("secondStepItemProcessor") ItemProcessor<NotaryDistrict, NotaryDistrict> secondStepItemProcessor,
@Qualifier("secondStepItemWriter") ItemWriter<NotaryDistrict> secondStepItemWriter,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("secondStep")
.<NotaryDistrict, NotaryDistrict>chunk(1)
.reader(secondStepItemReader)
.processor(secondStepItemProcessor)
.writer(secondStepItemWriter)
.build();
}

@Bean("updateNotaryDistrictsJob")
public Job updateNotaryDistrictsJob(JobBuilderFactory jobBuilderFactory,
@Qualifier("firstStep") Step firstStep,
@Qualifier("secondStep") Step secondStep) {
return jobBuilderFactory.get("updateNotaryDistrictsJob")
.start(firstStep)
.next(secondStep)
//.next(playerSummarization())
.build();
}

@Bean
public ExecutionContext executionContext() {
return new ExecutionContext();

}

/**
************************************ UPDATE NOTARY LIST JOB ********************************************
*/
@Bean()
public ItemReaderAdapter serviceItemReader() {
ItemReaderAdapter reader = new ItemReaderAdapter();
reader.setTargetObject(notaryListServiceAdapter);
reader.setTargetMethod("nextNotaryElement");
return reader;

}

@Bean
public Step readNotaryListStep(){
return steps.get("readNotaryListStep").
<Integer,Integer>chunk(1)  
.reader(serviceItemReader())
.processor(new NotaryDetailsEnrichProcessor(notaryService))
.writer(new ConsoleItemWriter())
.build();
}

@Bean("updateNotaryListInfoJob")
public Job updateNotaryListInfoJob(){
return jobs.get("updateNotaryListInfoJob")
.incrementer(new RunIdIncrementer())
.start(readNotaryListStep())
.build();
}


/**
************************************ MULTIPLE JOB CONFIGURATION ********************************************
*/

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(15);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(30);
return taskExecutor;
}

@Bean
public JobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, JobRepository jobRepository){
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}


}

你可以看到最后两个bean是ThreadPoolTaskExecutor和myJobLauncher豆子。

然后我更改了SpringBatchExampleJobLauncher为了使用这个启动器并执行我的两个任务而不是一个任务,这就是我所做的:

/**
* This bean schedules and runs our Spring Batch job.
*/
@Component
public class SpringBatchExampleJobLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringBatchExampleJobLauncher.class);
@Autowired
private JobLauncher jobLauncher;

@Autowired
@Qualifier("updateNotaryDistrictsJob")
private Job updateNotaryDistrictsJob;
@Autowired
@Qualifier("updateNotaryListInfoJob")
private Job updateNotaryListInfoJob;


@Scheduled(cron = "0/30 * * * * *")
public void run1(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(updateNotaryDistrictsJob, jobParameters);
}catch (Exception ex){
LOGGER.error(ex.getMessage());
}
}
@Scheduled(cron = "0/50 * * * * *")
public void run2(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(updateNotaryListInfoJob, jobParameters);
}catch (Exception ex){
LOGGER.error(ex.getMessage());
}
}
}

你可以看到我现在注入了之前定义的JobLauncherBean和我的两个作业Bean(定义在配置类中)。然后定义了run1()run2()方法,当满足CRON表达式时,应该运行我注入的两个作业。

问题是我现在在我的stracktrace中获得以下错误,并且没有执行任何操作:

***************************
APPLICATION FAILED TO START
***************************
Description:
The bean 'jobLauncher', defined in class path resource [org/springframework/batch/core/configuration/annotation/SimpleBatchConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [com/notariato/updateInfo/UpdateInfoBatchConfig.class] and overriding is disabled.
Action:
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

基本上,在我看来,这个错误告诉我,JobLauncherbean,我试图注入到我的启动器类尚未定义到我的UpdateInfoBatchConfig配置类。但这正是我所期望的,因为我在配置类中定义了bean,然后将其注入到要使用的启动器类中。

怎么了?我错过了什么?我该如何解决这个问题?

这是因为您在应用程序上下文中定义了JobLauncherbean,而Spring Batch也通过@EnableBatchProcessing定义了该bean(参见其Javadoc)。

如果您想使用自定义的JobLauncher,您应该提供一个BatchConfigurerbean并覆盖getJobLauncher。一种方法是让一个配置类扩展DefaultBatchConfigurer并覆盖createJobLauncher()。在这里的文档中有更详细的解释。

最新更新