弹簧批 - 循环读取器,处理器和写入器N次



在春季批处理中,如何循环读取器,处理器和写入器N次?

我的要求是:

我有"N"个客户/客户。 对于每个客户/客户,我需要从数据库(读取器)获取记录,然后我必须处理(处理者)客户/客户端的所有记录,然后我必须将记录写入文件(编写器)。

如何循环弹簧批处理作业N次?

AFAIK 恐怕没有针对这种情况的框架支持。至少不是你想要解决它的方式。 我建议以不同的方式解决问题:

选项 1

一次读取/处理/写入所有客户的所有记录。仅当它们都在同一个数据库中时,您才能执行此操作。否则我不推荐它,因为您必须配置 JTA/XA 事务,这不值得麻烦。

选项 2

为每个客户端运行一次作业(我认为是最佳选择)。将每个客户端的必要信息保存在不同的属性文件中(db 数据连接、按客户端筛选记录的值、特定于客户端可能需要的任何其他数据),并通过参数传递到它必须使用的客户端的作业。通过这种方式,您可以控制处理哪个客户端以及何时使用 bash 文件和/或 cron。如果使用 Spring Boot + Spring Batch,则可以将客户端配置存储在配置文件(application-clientX.properties)中并运行如下过程:

$>  java -Dspring.profiles.active="clientX"  
-jar "yourBatch-1.0.0-SNAPSHOT.jar"     
-next

奖金 - 选项 3

如果没有任何 abobe 符合您的需求,或者您坚持以您提出的方式解决它们提出的问题,那么您可以根据参数动态配置作业,并使用 JavaConf 为每个客户端创建一个步骤:

@Bean
public Job job(){
JobBuilder jb = jobBuilders.get("job");
for(Client c : clientsToProcess) {
jb.flow(buildStepByClient(c));
};
return jb.build();
}

再一次,我强烈建议你不要这样做:丑陋,违反框架哲学,难以维护,调试,你可能也必须在这里使用JTA/XA,...

我希望我有什么帮助!

本地分区将解决您的问题。

在你的分区程序中,你会把所有的客户端 ID 放在映射中,如下所示(只是伪代码),

public class PartitionByClient implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<>();
int partitionNumber = 1;
for (String client: allClients) {
ExecutionContext value = new ExecutionContext();
value.putString("client", client);
result.put("Client [" + client+ "] : THREAD " + partitionNumber, value);
partitionNumber++;
}
} 
return result;
}
}

这只是一个伪代码。您必须查看分区的详细文档。

您必须在@StepScope中标记您的读取器,处理器和写入器(即哪个部分需要client的值)。读者将在 SQL 的WHERE子句中使用此client。您将在读取器等定义中使用@Value("#{stepExecutionContext[client]}") String client来注入此值。

现在最后一块,您将需要一个任务执行器,并且concurrencyLimit等于的客户端将并行启动,前提是您在主分区程序步骤配置中设置了此任务执行器。

@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor simpleTaskExecutor = new SimpleAsyncTaskExecutor();
simpleTaskExecutor.setConcurrencyLimit(concurrencyLimit);
return simpleTaskExecutor;
}

如果您希望一次只运行一个客户端,concurrencyLimit将被1

最新更新