org.springframework.batch.core.JobExecutionException:分区处理程序在



我正在使用Spring Boot使用Spring Batch。在此示例中,我实现了自定义分区,因为员工表具有 UUID 值作为 PK。

Error:
org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30002ms.
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:307) ~[spring-jdbc-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400) ~[spring-tx-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373) ~[spring-tx-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at com.sun.proxy.$Proxy74.getTransaction(Unknown Source) ~[na:na]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:137) ~[spring-tx-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:138) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:135) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30002ms.
at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:689) ~[HikariCP-3.4.2.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:196) ~[HikariCP-3.4.2.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:161) ~[HikariCP-3.4.2.jar:na]
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-3.4.2.jar:na]
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:263) ~[spring-jdbc-5.2.4.RELEASE.jar:5.2.4.RELEASE]
... 25 common frames omitted
2020-05-16 20:55:22.675  INFO 31056 --- [cTaskExecutor-7] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:Thread--9] executed in 32s370ms
###################### 80
2020-05-16 20:55:22.787  INFO 31056 --- [TaskExecutor-10] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:2, serverValue:70}] to localhost:27017
2020-05-16 20:55:22.815  INFO 31056 --- [TaskExecutor-10] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:Thread--8] executed in 32s510ms
###################### 50
###################### 70
2020-05-16 20:55:22.839  INFO 31056 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:Thread--5] executed in 32s539ms
###################### 20
###################### 40
2020-05-16 20:55:22.845  INFO 31056 --- [cTaskExecutor-4] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:Thread--7] executed in 32s540ms
###################### 10
2020-05-16 20:55:22.851  INFO 31056 --- [cTaskExecutor-9] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:Thread--2] executed in 32s546ms
###################### 30
2020-05-16 20:55:22.856  INFO 31056 --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:Thread--1] executed in 32s556ms
###################### 100
###################### 60
2020-05-16 20:55:22.860  INFO 31056 --- [cTaskExecutor-6] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:71}] to localhost:27017
2020-05-16 20:55:22.864  INFO 31056 --- [cTaskExecutor-5] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:Thread--3] executed in 32s559ms
2020-05-16 20:55:22.867  INFO 31056 --- [cTaskExecutor-6] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:Thread--4] executed in 32s565ms
2020-05-16 20:55:22.874  INFO 31056 --- [cTaskExecutor-8] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:Thread--6] executed in 32s569ms
2020-05-16 20:55:22.883  INFO 31056 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep         : Step: [slaveStep:Thread--10] executed in 32s583ms
2020-05-16 20:55:22.885 ERROR 31056 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step contactStepOne in job readContactJob
org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at com.sun.proxy.$Proxy71.run(Unknown Source) [na:na]
at com.example.PostgresMongoContactsMigrationApplication.run(PostgresMongoContactsMigrationApplication.java:49) [classes/:na]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at com.example.PostgresMongoContactsMigrationApplication.main(PostgresMongoContactsMigrationApplication.java:38) [classes/:na] 

员工分区.java

public class EmployeePartitions implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitionMap = new HashMap<>();
int limit = 10;
int offset = 0;
int increment = 10;
int counter = 0;
for (int i = 0; i < 100; i += increment) {
counter++;
if (i == 0)
offset = i;
else
offset = i + 1;
limit = i + increment;
System.out.println("OFFSET " + offset + ", LIMIT = " + limit);
ExecutionContext context = new ExecutionContext();
context.put("limit", limit);
context.put("offset", offset);
partitionMap.put("Thread--" + counter, context);
System.out.println(partitionMap);
}
System.out.println(partitionMap.size());
return partitionMap;
}
}

员工工作.java

@Configuration
public class EmployeeJob {
private static final Integer chunkSize = 200;
@Autowired
private DataSource dataSource;
@Autowired
private Environment env;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public EmployeesPartitions EmployeesPartition() {
return new EmployeesPartitions();
}

@Bean("readEmployeeJob")
public Job readEmployeeJob() throws Exception {
return jobBuilderFactory.get("readEmployeeJob")
.incrementer(new RunIdIncrementer())
.start(EmployeeStepOne())
.build();
}
@Bean
public Step EmployeeStepOne() throws Exception {        
return stepBuilderFactory.get("EmployeeStepOne")
.partitioner(slaveStep().getName(), EmployeesPartition())
.step(slaveStep())
.gridSize(1)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}

// slave step
@Bean
public Step slaveStep() throws Exception {
return stepBuilderFactory.get("slaveStep")
.<EmployeesDTO, EmployeesDTO>chunk(chunkSize)
.reader(EmployeeReader(null, null))
.writer(EmployeeWriter())
.build();
}
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader<EmployeesDTO> EmployeeReader(
@Value("#{stepExecutionContext['limit']}") Long limit,
@Value("#{stepExecutionContext['offset']}") Long offset) throws Exception {
JdbcCursorItemReader<EmployeesDTO> reader = new JdbcCursorItemReader<>();
reader.setDataSource(this.dataSource);
reader.setSql("select * from scv_owner.Employees order by Employee_id limit "+ limit +" offset "+offset);
reader.setRowMapper(new EmployeeRowMapper());
reader.afterPropertiesSet();
return reader;
}
@Bean
public ItemProcessor<EmployeesDTO, EmployeesDTO> EmployeeProcessor() {
return new EmployeesProcessor();
}
@Bean
public ItemWriter<EmployeesDTO> EmployeeWriter() {
return new EmployeeWriter();
}
}

最佳尺寸是多少?

没有最佳大小,这取决于用例。为了获得最大吞吐量,应设置池大小,例如,如果需要最大吞吐量,每个分区都可以有自己的连接,否则根据分区完成所需的时间选择超时。

例如,如果您有 10 个分区,池大小为 5,则 5 个分区将正常工作,另外 5 个分区将等待。现在,如果分区最多需要 1 分钟才能完成,则可以将超时设置为 1 分钟(或者可能更长一点(,以便等待的分区不会超时,即在超时之前将连接返回到池中。

最新更新