我有像下面这样的弹簧批处理应用程序(表名和查询针对一些通用名称进行了编辑)
当我执行这个程序时,它能够读取 7500 个事件,即块大小的 3 倍,并且无法读取 Oracle 数据库中的剩余记录。我有一个包含 5000 万条记录的表,并且能够复制到另一个 noSql 数据库。
@EnableBatchProcessing
@SpringBootApplication
@EnableAutoConfiguration
public class MultiThreadPagingApp extends DefaultBatchConfigurer{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public DataSource dataSource() {
final DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("oracle.jdbc.OracleDriver");
dataSource.setUrl("jdbc:oracle:thin:@***********");
dataSource.setUsername("user");
dataSource.setPassword("password");
return dataSource;
}
@Override
public void setDataSource(DataSource dataSource) {}
@Bean
@StepScope
ItemReader<UserModel> dbReader() throws Exception {
JdbcPagingItemReader<UserModel> reader = new JdbcPagingItemReader<UserModel>();
final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
sqlPagingQueryProviderFactoryBean.setSelectClause("select * ");
sqlPagingQueryProviderFactoryBean.setFromClause("from user");
sqlPagingQueryProviderFactoryBean.setWhereClause("where id>0");
sqlPagingQueryProviderFactoryBean.setSortKey("name");
reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
reader.setDataSource(dataSource);
reader.setPageSize(2500);
reader.setRowMapper(new BeanPropertyRowMapper<>(UserModel.class));
reader.afterPropertiesSet();
reader.setSaveState(true);
System.out.println("Reading users anonymized in chunks of {}"+ 2500);
return reader;
}
@Bean
public Dbwriter writer() {
return new Dbwriter(); // I had another class for this
}
@Bean
public Step step1() throws Exception {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setMaxPoolSize(10);
taskExecutor.afterPropertiesSet();
return this.stepBuilderFactory.get("step1")
.<UserModel, UserModel>chunk(2500)
.reader(dbReader())
.writer(writer())
.taskExecutor(taskExecutor)
.build();
}
@Bean
public Job multithreadedJob() throws Exception {
return this.jobBuilderFactory.get("multithreadedJob")
.start(step1())
.build();
}
@Bean
public PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobRepository getJobRepo() throws Exception {
return new MapJobRepositoryFactoryBean(getTransactionManager()).getObject();
}
public static void main(String[] args) {
SpringApplication.run(MultiThreadPagingApp.class, args);
}
}
你能帮我如何使用弹簧批处理有效地读取所有记录,或者帮助我处理这个问题的任何其他方法。我尝试过这里提到的一个方法:http://techdive.in/java/jdbc-handling-huge-resultset使用单线程应用程序读取和保存所有记录需要 120 分钟。由于春季批次最适合这种情况,我认为我们可以在短时间内处理这种情况。
您正在将saveState
标志设置为 true(顺便说一句,应该在JdbcPagingItemReader
上调用 afterPropertiesSet
之前设置它,并在多线程步骤中使用此读取器。但是,记录了在多线程上下文中将此标志设置为 false。
数据库读取器的多线程通常不是最佳选择,我建议您在您的情况下使用分区。
我遇到了同样的问题,我通过更改我的排序键来修复它。我意识到前一个对于每个数据记录都没有不同。所以我用 ID 替换它,因为数据库的每一条记录都不同