弹簧批处理多进程,用于重载,每个进程下有多个线程



我有一个场景,我需要有大约50-60个不同的进程并发运行并执行一个任务。

每个进程必须使用sql查询从DB中获取数据,方法是传递一个值并获取数据,以便在后续任务中运行。select col_1, col_2, col_3 from table_1 where col_1 =:Process_1

@Bean
public Job partitioningJob() throws Exception {
return jobBuilderFactory.get("parallelJob")
.incrementer(new RunIdIncrementer())
.flow(masterStep())
.end()
.build();
}
@Bean
public Step masterStep() throws Exception {
//How to fetch data from configuration and pass all values in partitioner one by one.
// Can we give the name for every process so that it is helpful in logs and monitoring.
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep())
.partitioner("partition", partitioner())
.gridSize(10)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
@Bean
public Partitioner partitioner() throws Exception {
//Hit DB with sql query and fetch the data.
}
@Bean
public Step slaveStep() throws Exception {
return stepBuilderFactory.get("slaveStep")
.<Map<String, String>, Map<String, String>>chunk(1)
.processTask()
.build();
}

正如我们在Apache Camel中有Aggregator和parallelProcessing一样,Spring Batch是否有任何类似的功能来完成相同的工作?

我是Spring Batch的新手,目前正在探索它是否可以处理volume。因为这将是一个负载很重的应用程序,运行24*7,每个进程都需要并发运行,每个线程都应该能够支持一个进程内的多个线程。

是否有一种方法来监控这些进程,以便它无论如何终止,我应该能够重新启动那个特定的进程?请帮忙给出解决这个问题的办法。

请查找以上问题的答案。

  1. parallelProcessing -本地和远程分区支持并行处理,可以处理大量的卷,因为我们目前每天处理2到3亿个数据。

  2. 它可以处理大容量吗? -是的,它可以处理大容量,并且得到了很好的证明。

  3. 每个进程都需要并发运行,每个线程都应该能够在一个进程中支持多个线程——Spring批处理将根据你的ThreadPool来处理。请确保根据系统资源配置池。

  4. 是否有一种方法来监控这些进程,以便它被终止-是的。分区的每个并行进程都是一个步骤,您可以在BATCH_STEP_EXECUTION中监视并获得所有详细信息

  5. 应该能够重新启动特定的进程-是的,这是一个内置的功能,从失败的步骤重新启动。大容量作业我们总是使用容错,以便稍后处理拒绝。这也是内置的功能。

下面的项目示例

https://github.com/ngecom/springBatchLocalParition/tree/master

数据库添加- H2和创建表在资源文件夹中可用。我们总是倾向于使用数据源池,池大小将大于您的线程池大小。

示例项目摘要

  1. 从表"客户"中读取并划分为步骤分区
  2. 每一步分区写入新表"new_customer">
  3. JobConfiguration.java方法名"taskExecutor()">
  4. slaveStep()中可用的块大小。
  5. 你可以根据你的并行步骤计算内存大小,并配置为虚拟机最大内存。

查询帮助您在执行

后根据上述问题进行分析
SELECT * FROM NEW_CUSTOMER;   
SELECT * FROM BATCH_JOB_EXECUTION bje;
SELECT * FROM BATCH_STEP_EXECUTION bse WHERE JOB_EXECUTION_ID=2; 
SELECT * FROM BATCH_STEP_EXECUTION_CONTEXT bsec WHERE STEP_EXECUTION_ID=4; 

如果你想更改为MYSQL,添加下面的数据源

spring.datasource.hikari.minimum-idle=5 
spring.datasource.hikari.maximum-pool-size=100
spring.datasource.hikari.idle-timeout=600000 
spring.datasource.hikari.max-lifetime=1800000 
spring.datasource.hikari.auto-commit=true 
spring.datasource.hikari.poolName=SpringBoot-HikariCP
spring.datasource.url=jdbc:mysql://localhost:3306/ngecomdev
spring.datasource.username=ngecom
spring.datasource.password=ngbilling

请始终参考下面的guthub URL。你会从中得到很多灵感。

https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples