春季批处理分区连接覆盖 RowMapper 值,因此获得单个数组而不是多个数组



我正在使用Spring BatchPostgres读取数据并将其写入MongoDB。就我而言,Employee有 3 种不同类型的电子邮件地址 1( 家庭住址 2( 办公室地址 3( 员工电子邮件表中的社会地址。

由于我们在数据库中几乎有10 lacs名员工,因此使用自定义分区Postgres中提取数据并与employee_email(后来employee_phone也加入(,以便在处理器中具有Mongo POJO的映射并保存到MongoDB。

现在的问题是我需要将员工电子邮件记录作为数组嵌入到联系人中,但使用当前逻辑将其另存为单独的集合

我们如何解决这个问题?

select * from root.employees c
full outer join root.employee_email ce
on c.employee_id = ce.employee_id
order by c.employee_id limit 1000 offset 0;

现在,当数据保存到数据库中时,只有电子邮件被保存,并且似乎其他 2 个被覆盖。

我需要如何处理这个问题,看起来EmployeeRowMapper正在覆盖所有其他电子邮件地址。我将如何解决这个问题?

员工.工作

@Configuration
public class EmployeeJob {
private static Logger logger = LoggerFactory.getLogger(EmployeeJob.class);
private static final Integer CHUNK_SIZE = 1000;
@Autowired
private DataSource dataSource;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public EmployeesPartitions EmployeesPartition() {
return new EmployeesPartitions();
}
@Bean
public EmployeesJobListener EmployeesJobListener() {
return new EmployeesJobListener();
}
@Bean("readEmployeeJob")
public Job readEmployeeJob() throws Exception {
return jobBuilderFactory.get("readEmployeeJob")
.incrementer(new RunIdIncrementer())
.start(EmployeeStepOne())
.listener(EmployeesJobListener())
.build();
}
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("fac-thrd-");
taskExecutor.setConcurrencyLimit(Runtime.getRuntime().availableProcessors());
taskExecutor.setThreadGroupName("Employees-Thread");
taskExecutor.setDaemon(false);
taskExecutor.setThreadPriority(5);
return taskExecutor;
}
@Bean
public Step EmployeeStepOne() throws Exception {            
return stepBuilderFactory.get("EmployeeStepOne")
.partitioner(slaveStep().getName(), EmployeesPartition())
.step(slaveStep())
.gridSize(10)
.taskExecutor(simpleAsyncTaskExecutor())
.build();
}
// slave step
@Bean
public Step slaveStep() throws Exception {
return stepBuilderFactory.get("slaveStep")
.<EmployeesDTO, EmployeesDTO>chunk(CHUNK_SIZE)
.reader(EmployeeReader(null, null))
.writer(EmployeeWriter())
.build();
}

// Readers
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader<EmployeesDTO> EmployeeReader(
@Value("#{stepExecutionContext['limit']}") Long limit,
@Value("#{stepExecutionContext['offset']}") Long offset) throws Exception {
String sql = "select * from root.Employees c "
+ "full outer join root.Employee_email ce "
+ "on c.Employee_id = ce.Employee_id "
+ "order by c.Employee_id limit " + limit +" offset "+ offset;
logger.info("Employees SQL = {} ", sql);
JdbcCursorItemReader<EmployeesDTO> reader = new JdbcCursorItemReader<>();
reader.setDataSource(this.dataSource);
reader.setSql(sql);
reader.setRowMapper(new EmployeeRowMapper());
reader.afterPropertiesSet();
return reader;
}
// Processors
@Bean
public ItemProcessor<EmployeesDTO, EmployeesDTO> EmployeeProcessor() {
return new EmployeesProcessor();
}
// Writers
@Bean
public ItemWriter<EmployeesDTO> EmployeeWriter() {
return new EmployeeWriter();
}
}

行映射器.java

public class EmployeeRowMapper implements RowMapper<Employee> {
@Override
public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {
// EmployeeEmail email = new EmployeeEmail();
....
....
....
....
....
List<EmployeeEmail> employeeEmails = new ArrayList<>();
employeeEmails.add(email);
Employee dto = Employee.builder()
.businessTitle(rs.getString(""))
...........
...........
...........
...........
...........
...........
...........
.employeeEmails(employeeEmails)
.build();
return dto;
}
}

RowMapper用于将单个数据库行映射到 POJO。因此,除非每一行都包含不同列中的所有电子邮件,例如id,name,email1,email2,email3,否则您尝试执行的操作将不起作用。

如果每个项目都有 3 行电子邮件,则需要使查询仅返回id,name并使用其他查询来获取电子邮件。此附加查询可以在映射器本身或项目处理器中完成,如驱动查询模式中所述。

最新更新