Spring Batch Async Processor-AsyncItemProcessor未生效,在启动时获得作业状



我正在尝试实现一个简单的春季批处理作业

目标是使用JpaItemReader在DB上执行选择查询,并以异步方式处理所有iten

我遵循此示例->https://github.com/mminella/scaling-demos/blob/master/single-jvm-demos/src/main/java/io/spring/batch/scalingdemos/asyncprocessor/AsyncProcessorJobApplication.java

如果我在一个线程中运行此代码,并且它运行良好

但当我添加异步进程时,看起来像是作业将状态更改为COMPLETED而不等待Process((的执行

我可能会漏掉这个剧本里的一些东西吗?

以下是控制台的结果:

2019-11-07 10:15:10.672 [main] T: INFO  o.s.batch.core.job.SimpleStepHandler - Step already complete or not restartable, so no action to execute: StepExecution: id=92, version=21, name=step1, status=COMPLETED, exitStatus=COMPLETED, readCount=1872, filterCount=0, writeCount=1872 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=19, rollbackCount=0, exitDescription= 
2019-11-07 10:15:10.693 [main] T: INFO  c.s.g.G.JobCompletionNotificationListener - !!! JOB FINISHED! Time to verify the results 
2019-11-07 10:15:10.702 [main] T: INFO  o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=reCalculate]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 74ms 
201

>** Here is my Class**

import javax.persistence.EntityManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import com.batch.poc.common.Position;

@Component
public class PositionRecalculationBatchExecutorAsync {
private static final Logger log = LoggerFactory.getLogger(PositionRecalculationBatchExecutorAsync.class);
private static final int BATCH_SIZE = 450;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired public EntityManagerFactory emf;

@Bean
@StepScope
public   JpaPagingItemReader<Position> read( ) throws Exception{
JpaPagingItemReader<Position> positionBatchReader = new  JpaPagingItemReader<>();
positionBatchReader.setEntityManagerFactory(emf);
StringBuilder query = new StringBuilder("SELECT ")
.append(" pos ")
.append(" FROM ")
.append(" Position pos");

positionBatchReader.setQueryString( query.toString() );
positionBatchReader.setPageSize(400);
positionBatchReader.setSaveState(false);
positionBatchReader.afterPropertiesSet();
return positionBatchReader;
}

@Bean
@StepScope
public  JpaItemWriter<Position> update() {
//Not in Use
JpaItemWriter<Position>  positionBatchWriter = new JpaItemWriter<> ();
return positionBatchWriter;
}
@Bean public Job reCalculate(JobCompletionNotificationListener listener) throws Exception { 
return this.jobBuilderFactory.get("reCalculate") 
.start(step1() )
.listener(listener)
.build(); 
}
@Bean public PositionItemProcessor processor() {
return new PositionItemProcessor(); 
}

@Bean 
public AsyncItemProcessor<Position, Position>  asyncItemProcessor() throws Exception {
AsyncItemProcessor<Position, Position> p =  new AsyncItemProcessor<>();
p.setDelegate(processor());
p.setTaskExecutor(new SimpleAsyncTaskExecutor());
p.afterPropertiesSet();
return p;
}
@Bean 
@StepScope
public AsyncItemWriter<Position> asyncItemWriter(){
log.info("!!! JOB asyncItemWriter!!");
AsyncItemWriter<Position> writer = new AsyncItemWriter<>();
writer.setDelegate(this.update());
return writer;
}
@Bean
public Step step1() throws Exception {
log.info("!!!  step1 !!");
return this.stepBuilderFactory.get("step1")
.<Position, Position> chunk(100)
.reader(read())
.processor(  (ItemProcessor) asyncItemProcessor() )
.writer( asyncItemWriter() )
.build();
}

}

但是当我添加异步进程时,看起来作业将状态更改为COMPLETED

看起来您第一次使用常规处理器执行作业(此处步骤已执行并完成(,然后在添加异步处理器后运行相同的作业(此处显示步骤已完成,因此显示消息Step already complete or not restartable, so no action to execute(。

如果您想重新运行步骤定义,即使它已经完成,也可以在步骤定义上设置allowStartIfComplete标志:

@Bean
public Step step1() throws Exception {
log.info("!!!  step1 !!");
return this.stepBuilderFactory.get("step1")
.<Position, Position> chunk(100)
.reader(read())
.processor(  (ItemProcessor) asyncItemProcessor() )
.writer( asyncItemWriter() )
.allowStartIfComplete(true)
.build();
}

最新更新