如何异步启动Spring Batch Job



我遵循了春季批处理文档,无法使我的作业异步运行。

因此,我从web容器运行作业,该作业将通过REST端点触发。

我想在完成整个作业之前获得JobInstance ID以传递它作为响应。因此,他们可以稍后使用JobInstance ID检查作业的状态,而不是等待。但我没能让它工作。下面是我尝试过的示例代码。请让我知道我遗漏了什么或错了什么。

BatchConfig使异步JobLauncher

@Configuration
public class BatchConfig {
@Autowired
JobRepository jobRepository;

@Bean
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}

控制器

@Autowired
JobLauncher jobLauncher;
@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().
addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
System.out.println(jobExecution.getJobInstance().getInstanceId());
System.out.println("OK RESPONSE");
return jobExecution.getJobInstance().getInstanceId();
}

JobBuilder作为组件

@Component
public class BatchComponent {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
public Job customJob(String someParam) throws Exception {
return jobBuilderFactory.get("personProcessor")
.incrementer(new RunIdIncrementer()).listener(listener())
.flow(personPorcessStep(someParam)).end().build();
}

private Step personPorcessStep(String someParam) throws Exception {
return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
.reader(new PersonReader(someParam)).faultTolerant().
skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
.writer(new PersonWriter()).build();
}

private JobExecutionListener listener() {
return new PersonJobCompletionListener();
}
private class PersonInput {
String firstName;
public PersonInput(String firstName) {
this.firstName = firstName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}
private class PersonOutput {
String firstName;
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}
public class PersonReader implements ItemReader<PersonInput> {
private List<PersonInput> items;
private int count = 0;
public PersonReader(String someParam) throws InterruptedException {
Thread.sleep(10000L); //to simulate processing
//manipulate and provide data in the read method
//just for testing i have given some dummy example
items = new ArrayList<PersonInput>();
PersonInput pi = new PersonInput("john");
items.add(pi);
}
@Override
public PersonInput read() {
if (count < items.size()) {
return items.get(count++);
}
return null;
}
}

public class DataDuplicateSkipper implements SkipPolicy {
@Override
public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
if (exception instanceof DataIntegrityViolationException) {
return true;
}
return true;
}
}

private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {
@Override
public PersonOutput process(PersonInput item) throws Exception {
return null;
}
}
private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
@Override
public void write(List<? extends PersonOutput> results) throws Exception {
return;
}
}
private class PersonJobCompletionListener implements JobExecutionListener {
public PersonJobCompletionListener() {
}
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("JOB COMPLETED");
}
}
}

主要功能

@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}

我使用基于注释的配置,并将gradle与以下批处理包一起使用。

compile('org.springframework.boot:spring-boot-starter-batch')

如果需要更多信息,请告诉我。我找不到任何例子来运行这个常见的用例。

谢谢你抽出时间。

试试这个,在您的配置中,您需要使用@Bean(name="myJobLauncher"(创建带有SimpleAsyncTaskExecutor的customJobLauncher,并且在您的控制器中也会使用@Qualifier

@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}

在您的控制器中

@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;

如果我查看您的代码,我会发现几个错误。首先,您的自定义配置没有加载,因为如果加载了,那么同一接口的重复bean实例的注入将失败。

spring boot有很多神奇之处,但如果你不告诉他做一些组件扫描,就不会像预期的那样加载任何东西。

我可以看到的第二个问题是BatchConfig类:它没有扩展DefaultBatchConfigure,也没有覆盖getJobLauncher((,所以即使启动魔术会加载所有内容,你也会得到默认的。这是一个有效的配置,它符合文档@EnableBatchProcessing API

BatchConfig

@Configuration
@EnableBatchProcessing(modular = true)
@Slf4j
public class BatchConfig extends DefaultBatchConfigurer {
@Override
@Bean
public JobLauncher getJobLauncher() {
try {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
} catch (Exception e) {
log.error("Can't load SimpleJobLauncher with SimpleAsyncTaskExecutor: {} fallback on default", e);
return super.getJobLauncher();
}
}
}

主要功能

@SpringBootApplication
@EnableScheduling
@EnableAsync
@ComponentScan(basePackageClasses = {BatchConfig.class})
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}

尽管您有自定义的jobLauncher,但您正在使用Spring提供的默认jobLauncher运行作业。你能在你的控制器中自动连接simpleJobLauncher并尝试一下吗?

我知道这是一个老问题,但我还是会为未来的用户发布这个答案。

在查看了您的代码后,我不知道您为什么会出现这个问题,但我可以建议您使用限定符注释,再像这样使用ThreadPoolTaskExecutitor,看看它是否能解决您的问题。

您也可以查看本教程:异步春季批量作业处理以了解更多详细信息。它将帮助您异步配置春季批处理作业。这个教程是我写的。

@Configuration
public class BatchConfig {

@Autowired
private JobRepository jobRepository;

@Bean
public TaskExecutor threadPoolTaskExecutor(){

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(12);
executor.setCorePoolSize(8);
executor.setQueueCapacity(15);

return executor;
}

@Bean
public JobLauncher asyncJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();

jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(threadPoolTaskExecutor());
return jobLauncher;
}
}

JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);。Joblauncher将在作业完成后等待,然后再返回任何内容,这就是为什么如果这是您的问题,您的服务可能需要很长时间才能响应。如果您想要异步功能,您可能需要查看Spring的@EnableAsync&@Async

@启用异步

根据spring文档,要异步返回http请求的响应,需要使用org.springframework.core.task.SimpleSyncTaskExecutor.

spring TaskExecutor接口的任何实现都可以用于控制如何异步执行作业。

春季批处理文档

<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>

如果您使用Lombok,这可能会对您有所帮助:

TLDR:Lombok@AllArgsConstructor似乎无法很好地与@Qualifier注释配合使用编辑:如果您在lombok.config文件中启用了@Qualifier注释,则可以将@Qualifier@AllArgsConstructor一起使用,如下所示:

lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier

我知道这个老问题,但我有完全相同的问题,没有一个答案能解决它

我像这样配置了异步作业启动器,并添加了限定符以确保注入该作业启动器:

@Bean(name = "asyncJobLauncher")
public JobLauncher simpleJobLauncher(JobRepository jobRepository) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}

并像一样注入

@Qualifier("asyncJobLauncher")
private final JobLauncher jobLauncher;

在将Lombok@AllArgsConstructor更改为autowire后,我使用了它,注入了正确的作业启动器,现在该作业异步执行:

@Autowired
@Qualifier("asyncJobLauncher")
private JobLauncher jobLauncher;

此外,我不必从DefaultBatchConfigurer扩展我的配置

相关内容

  • 没有找到相关文章

最新更新