我们使用具有 2 个数据源的 Spring Batch:1 个用于读取(源数据库),1 个用于写入(目标数据库)。
Spring Batch 配置为使用目标数据源/事务管理器进行JobRepository
和JobExplorer
:
@EnableBatchProcessing(transactionManagerRef = "destinationTransactionManager", dataSourceRef = "destinationDataSource")
对于作业配置,JpaCursorItemReader
配置为使用属于源数据库的EntityManagerFactory
(具有属于源数据库的PlatformTransactionManager
)。
JpaItemWriter
配置为使用属于目标数据库的EntityManagerFactory
和PlatformTransactionManager
。此PlatformTransactionManager
与@EnableBatchProcessing
中使用的相同。
我们面向块的步骤使用属于目标数据库的PlatformTransactionManager
(与@EnableBatchProcessing
中使用的相同)。
我的问题是:这是一个正确的设置(特别是关于事务管理)?到目前为止,它还没有给我们带来任何问题。我有点担心,因为读者端使用不同的数据源。
我的假设是这应该有效,因为块的PlatformTransactionManager
与用于JobRepository
和JpaItemWriter
的相同。所以我假设当某些事情失败时,回滚进度(在元数据表中)和写入的项目至少应该工作,因为它们使用相同的数据源和事务管理器。此外,JpaCursorItemReader
似乎没有交易意识。
我们的配置如下所示(略微修改以省略域语言):
@Configuration
@AllArgsConstructor
@EnableBatchProcessing(transactionManagerRef = "destinationTransactionManager", dataSourceRef = "destinationDataSource")
public class JobConfiguration {
@Bean
public JpaCursorItemReader<SourceEntity> sourceReader(
@Qualifier("sourceEntityManagerFactory") final LocalContainerEntityManagerFactoryBean sourceEntityManagerFactory
) {
return new JpaCursorItemReaderBuilder<SourceEntity>()
.name("SourceEntity")
.entityManagerFactory(Objects.requireNonNull(sourceEntityManagerFactory.getObject()))
.queryString("from SourceEntity")
.build();
}
@Bean
public JpaItemWriter<DestinationEntity> destinationWriter(
@Qualifier("destinationEntityManagerFactory")
final LocalContainerEntityManagerFactoryBean destinationEntityManagerFactory
) {
return new JpaItemWriterBuilder<DestinationEntity>()
.entityManagerFactory(Objects.requireNonNull(destinationEntityManagerFactory.getObject()))
.build();
}
@Bean
public Step step(
@Qualifier("sourceReader") final JpaCursorItemReader<SourceEntity> reader,
@Qualifier("destinationWriter") final JpaItemWriter<DestinationEntity> writer,
final CustomProcessor processor, // implementation omitted for brevity
@Qualifier("destinationTransactionManager") final PlatformTransactionManager transactionManager,
final JobRepository jobRepository
) {
return new StepBuilder("step", jobRepository)
.<SourceEntity, DestinationEntity>chunk(10, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job job(final Step step,
final JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.incrementer(new RunIdIncrementer())
.flow(step)
.end()
.build();
}
}
这按预期工作,但我想知道这是否是有关 tx 管理的正确设置。
对于这种配置(从不同的数据源读取/写入,或写入两个不同的事务资源),最大的风险是业务数据和技术元数据在发生故障时不同步。
您需要做的是使用一个JtaTransactionManager
来配置Spring Batch,该来协调同一步骤中涉及的所有事务资源的事务管理器。
如果配置正确,Spring Batch 保证的是读取器、写入器和作业存储库交互在同一事务中执行,以便业务数据和技术元数据作为一个单元提交或回滚。