春季批处理 Jdbc 分页项目读取器无法读取所有事件



我有像下面这样的弹簧批处理应用程序(表名和查询针对一些通用名称进行了编辑)

当我执行这个程序时,它能够读取 7500 个事件,即块大小的 3 倍,并且无法读取 Oracle 数据库中的剩余记录。我有一个包含 5000 万条记录的表,并且能够复制到另一个 noSql 数据库。

@EnableBatchProcessing
@SpringBootApplication
@EnableAutoConfiguration
public class MultiThreadPagingApp extends DefaultBatchConfigurer{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public DataSource dataSource() {
    final DriverManagerDataSource dataSource = new DriverManagerDataSource();
    dataSource.setDriverClassName("oracle.jdbc.OracleDriver");
    dataSource.setUrl("jdbc:oracle:thin:@***********");
    dataSource.setUsername("user");
    dataSource.setPassword("password");
    return dataSource;
}

@Override
public void setDataSource(DataSource dataSource) {}
@Bean
@StepScope
ItemReader<UserModel> dbReader() throws Exception {
    JdbcPagingItemReader<UserModel> reader = new JdbcPagingItemReader<UserModel>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();        
    sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
    sqlPagingQueryProviderFactoryBean.setSelectClause("select * ");
    sqlPagingQueryProviderFactoryBean.setFromClause("from user");
    sqlPagingQueryProviderFactoryBean.setWhereClause("where id>0");
    sqlPagingQueryProviderFactoryBean.setSortKey("name");
    reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
    reader.setDataSource(dataSource);
    reader.setPageSize(2500);       
    reader.setRowMapper(new BeanPropertyRowMapper<>(UserModel.class));
    reader.afterPropertiesSet();
    reader.setSaveState(true);
    System.out.println("Reading users anonymized in chunks of {}"+ 2500);
    return reader;
}

@Bean
public Dbwriter writer() {
    return new Dbwriter(); // I had another class for this
}   
@Bean
public Step step1() throws Exception {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(4);
    taskExecutor.setMaxPoolSize(10);
    taskExecutor.afterPropertiesSet();
    return this.stepBuilderFactory.get("step1")
            .<UserModel, UserModel>chunk(2500)
            .reader(dbReader())
            .writer(writer())
            .taskExecutor(taskExecutor)
            .build();
}

@Bean
public Job multithreadedJob() throws Exception {
    return this.jobBuilderFactory.get("multithreadedJob")
            .start(step1())
            .build();
} 

@Bean
public PlatformTransactionManager getTransactionManager() {
    return new ResourcelessTransactionManager();
}
@Bean
public JobRepository getJobRepo() throws Exception {
    return new MapJobRepositoryFactoryBean(getTransactionManager()).getObject();
}
public static void main(String[] args) {
    SpringApplication.run(MultiThreadPagingApp.class, args);
}

}

你能帮我如何使用弹簧批处理有效地读取所有记录,或者帮助我处理这个问题的任何其他方法。我尝试过这里提到的一个方法:http://techdive.in/java/jdbc-handling-huge-resultset使用单线程应用程序读取和保存所有记录需要 120 分钟。由于春季批次最适合这种情况,我认为我们可以在短时间内处理这种情况。

您正在将saveState标志设置为 true(顺便说一句,应该在JdbcPagingItemReader上调用 afterPropertiesSet 之前设置它,并在多线程步骤中使用此读取器。但是,记录了在多线程上下文中将此标志设置为 false。

使用

数据库读取器的多线程通常不是最佳选择,我建议您在您的情况下使用分区。

我遇到了同样的问题,我通过更改我的排序键来修复它。我意识到前一个对于每个数据记录都没有不同。所以我用 ID 替换它,因为数据库的每一条记录都不同

最新更新