春季批处理作业之前。我有一个导入表,其中包含所有需要导入系统的项目。在这一点上,仅包含我们系统中不存在的项目。
接下来,我有一个春季批处理作业,该作业使用JPapagingItemReader从此导入表中读取。下班完成后,它使用itemwriter写信给数据库。
我以10000的页面大小和大小的大小运行。现在,在MySQL InnoDB上运行时,这绝对可以。我什至可以使用多个线程,一切正常。
但是现在我们正在迁移到PostgreSQL,同一批次作业遇到了一些非常奇怪的问题发生的事情是它试图将重复插入我们的系统中。这自然会被唯一的索引约束拒绝,并且会丢弃错误。由于导入db表已被验证以在批处理作业开始之前仅包含不存在的,因此我能想到的唯一原因是,当我在Postgres上运行时,JpapaPingItemReader从导入db表中读取了一些行。但是为什么要这样做呢?
我已经尝试了许多设置。将大块和页面大小调至大约100左右,只会使导入速度较慢,但仍然相同的错误。运行单线程而不是多个线程只会使错误稍后发生。那么,我的jpapagingitemreader仅在postgressql上多次读取相同的项目的原因到底是什么?支持读者的选择语句很简单,它的名称为:
@NamedQuery(name = "ImportDTO.findAllForInsert",
query = "select h from ImportDTO h where h.toBeImported = true")
还请注意,在运行时,批处理作业将不会更改tobeimport的标志,因此该查询的结果应始终在批处理作业之前,下和之后返回相同的结果。
任何见解,技巧或帮助都大大化了!
这是批处理配置代码:
import javax.persistence.EntityManagerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private OrganizationItemWriter organizationItemWriter;
@Autowired
private EntityManagerFactory entityManagerFactory;
@Autowired
private OrganizationUpdateProcessor organizationUpdateProcessor;
@Autowired
private OrganizationInsertProcessor organizationInsertProcessor;
private Integer organizationBatchSize = 10000;
private Integer organizationThreadSize = 3;
private Integer maxThreadSize = organizationThreadSize;
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
return launcher;
}
@Bean
public JpaPagingItemReader<ImportDTO> findNewImportsToImport() throws Exception {
JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
databaseReader.setEntityManagerFactory(entityManagerFactory);
JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
jpaQueryProvider.setQuery("ImportDTO.findAllForInsert");
databaseReader.setQueryProvider(jpaQueryProvider);
databaseReader.setPageSize(organizationBatchSize);
// must be set to false if multi threaded
databaseReader.setSaveState(false);
databaseReader.afterPropertiesSet();
return databaseReader;
}
@Bean
public JpaPagingItemReader<ImportDTO> findImportsToUpdate() throws Exception {
JpaPagingItemReader<ImportDTO> databaseReader = new JpaPagingItemReader<>();
databaseReader.setEntityManagerFactory(entityManagerFactory);
JpaQueryProviderImpl<ImportDTO> jpaQueryProvider = new JpaQueryProviderImpl<>();
jpaQueryProvider.setQuery("ImportDTO.findAllForUpdate");
databaseReader.setQueryProvider(jpaQueryProvider);
databaseReader.setPageSize(organizationBatchSize);
// must be set to false if multi threaded
databaseReader.setSaveState(false);
databaseReader.afterPropertiesSet();
return databaseReader;
}
@Bean
public OrganizationItemWriter writer() throws Exception {
return organizationItemWriter;
}
@Bean
public StepExecutionNotificationListener stepExecutionListener() {
return new StepExecutionNotificationListener();
}
@Bean
public ChunkExecutionListener chunkListener() {
return new ChunkExecutionListener();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(maxThreadSize);
return taskExecutor;
}
@Bean
public Job importOrganizationsJob(JobCompletionNotificationListener listener) throws Exception {
return jobBuilderFactory.get("importAndUpdateOrganizationJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(importNewOrganizationsFromImports())
.next(updateOrganizationsFromImports())
.build();
}
@Bean
public Step importNewOrganizationsFromImports() throws Exception {
return stepBuilderFactory.get("importNewOrganizationsFromImports")
.<ImportDTO, Organization> chunk(organizationBatchSize)
.reader(findNewImportsToImport())
.processor(organizationInsertProcessor)
.writer(writer())
.taskExecutor(taskExecutor())
.listener(stepExecutionListener())
.listener(chunkListener())
.throttleLimit(organizationThreadSize)
.build();
}
@Bean
public Step updateOrganizationsFromImports() throws Exception {
return stepBuilderFactory.get("updateOrganizationsFromImports")
.<ImportDTO, Organization> chunk(organizationBatchSize)
.reader(findImportsToUpdate())
.processor(organizationUpdateProcessor)
.writer(writer())
.taskExecutor(taskExecutor())
.listener(stepExecutionListener())
.listener(chunkListener())
.throttleLimit(organizationThreadSize)
.build();
}
}
您需要按子句添加订单才能选择