我是spring-batch的新手,在批处理中使用多个数据源时遇到问题。
让我解释一下。
我正在使用Spring Boot的服务器中的2个数据库。
到目前为止,我的RoutingDataSource实现一切都很好。
@Component("dataSource")
public class RoutingDataSource extends AbstractRoutingDataSource {
@Autowired
@Qualifier("datasourceA")
DataSource datasourceA;
@Autowired
@Qualifier("datasourceB")
DataSource datasourceB;
@PostConstruct
public void init() {
setDefaultTargetDataSource(datasourceA);
final Map<Object, Object> map = new HashMap<>();
map.put(Database.A, datasourceA);
map.put(Database.B, datasourceB);
setTargetDataSources(map);
}
@Override
protected Object determineCurrentLookupKey() {
return DatabaseContextHolder.getDatabase();
}
}
实现需要一个DatabaseContextHolder,这里是:
public class DatabaseContextHolder {
private static final ThreadLocal<Database> contextHolder = new ThreadLocal<>();
public static void setDatabase(final Database dbConnection) {
contextHolder.set(dbConnection);
}
public static Database getDatabase() {
return contextHolder.get();
}
}
当我在服务器上收到请求时,我有一个基本的拦截器,它根据请求中的一些输入设置当前数据库。使用方法DatabaseContextHolder.setDatabase(db);
,我的实际控制器一切都很好。
当我尝试用一个tasklet运行作业时,情况会变得更加复杂。
我的一个控制器启动这样的异步任务。
@GetMapping("/batch")
public void startBatch() {
return jobLauncher.run("myJob", new JobParameters());
}
@EnableBatchProcessing
@Configuration
public class MyBatch extends DefaultBatchConfigurer {
@Autowired private JobBuilderFactory jobs;
@Autowired private StepBuilderFactory steps;
@Autowired private MyTasklet tasklet;
@Bean
public Job job(Step step) {
return jobs.get("myJob").start(step).build();
}
@Bean
protected Step registeredDeliveryTask() {
return steps.get("myTask").tasklet(tasklet).build();
}
/** Overring the joblauncher get method to make it asynchornous */
@Override
public JobLauncher getJobLauncher() {
try {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(super.getJobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
} catch (Exception e) {
throw new BatchConfigurationException(e);
}
}
}
还有我的任务:
@Component
public class MyTasklet implements Tasklet {
@Autowired
private UserRepository repository;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)throws Exception {
//Do stuff with the repository.
}
但是RoutingDataSource不起作用,即使我在开始作业之前设置了上下文。例如,如果我将数据库设置为B,则repo将在数据库A上工作。它始终是选定的默认数据源。(因为这句话setDefaultTargetDataSource(datasourceA);
(
我试图通过在tasklet内部传递参数中的值来设置数据库,但仍然遇到了同样的问题。
@GetMapping("/batch")
public void startBatch() {
Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("database", new JobParameter(DatabaseContextHolder.getCircaDatabase().toString()));
return jobLauncher.run("myJob", new JobParameters(parameters));
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)throws Exception {
String database =
chunkContext.getStepContext().getStepExecution().getJobParameters().getString("database");
DatabaseContextHolder.setDatabase(Database.valueOf(database));
//Do stuff with the repository.
}
我觉得问题是因为数据库是在另一个线程中设置的,因为我的工作是异步的。因此,它无法在启动作业之前获取数据库集。但到目前为止,我找不到任何解决方案。
问候
您的路由数据源用于Spring Batch的元数据,这意味着作业存储库将根据处理请求的线程与不同的数据库进行交互。批处理作业不需要这样做。您需要将SpringBatch配置为使用固定的数据源。