NullPointerException with Atomikos and Spring Batch JpaPagin



我将Atomikos与spring-boot、spring-batch、spring-data jpa、hibernate和Postgresql一起使用。我在XA数据源中配置了两个带有Atomikos池的数据源。

我遇到两个困难

第一个是我不能禁用Atomikos池自动提交。我搜索了所有地方,找不到可以设置为false的类似自动提交的属性。

第二个有点复杂。我的分布式事务管理器可以工作,因为我有两个数据库在更新。

但是当我使用springbatch通过JpaPagingItemReaderBuilder从我的第二个数据库读取数据时,我有一个nullPointeurException下面是我的第二个数据源和第二个实体管理器的配置,用于连接到第二个数据库。

@Bean
@ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.secondary")
public DataSource dataSourceSecondary(){
return new AtomikosDataSourceBean();
}

@Bean
@PersistenceContext(unitName = "entityManagerSecondaryFactory")
public LocalContainerEntityManagerFactoryBean entityManagerSecondaryFactory(){
Properties properties;
HibernateJpaVendorAdapter jpaVendorAdapter;
LocalContainerEntityManagerFactoryBean factory;
factory = new LocalContainerEntityManagerFactoryBean();
factory.setDataSource(this.dataSourceSecondary());
factory.setPackagesToScan(this.deltaProperties.getRepositoryPackageScanSecondary());
//Conf JPAVendorAdapter
jpaVendorAdapter = new HibernateJpaVendorAdapter();
jpaVendorAdapter.setGenerateDdl(Boolean.parseBoolean(this.deltaProperties.getGenerateDdl()));
jpaVendorAdapter.setDatabasePlatform(this.deltaProperties.getDatabase());
factory.setJpaVendorAdapter(jpaVendorAdapter);
//Propriété de l'implémentation du Provider JPA
properties = new Properties();
properties.setProperty("hibernate.cache.use_query_cache", this.deltaProperties.getUseQueryCache());
properties.setProperty("hibernate.cache.use_second_level_cache", this.deltaProperties.getUseSecondLevelCache());
properties.setProperty("hibernate.cache.region.factory_class", this.deltaProperties.getRegionFactoryClass());
properties.setProperty("hibernate.generate_statistics", this.deltaProperties.getGenerateStatistics());
properties.setProperty("hibernate.jdbc.batch_size", Integer.toString(this.deltaProperties.getBatchSize()));
properties.setProperty("hibernate.order_inserts", this.deltaProperties.getOrderInserts());
properties.setProperty("hibernate.order_updates", this.deltaProperties.getOrderUpdates());
properties.setProperty("hibernate.hbm2ddl.auto", this.deltaProperties.getHbm2ddlAuto());
properties.setProperty("hibernate.id.optimizer.pooled.preferred",
this.deltaProperties.getIdOptimizerPooledPreferred());
properties.setProperty("hibernate.format_sql", this.deltaProperties.getFormatSql());
properties.setProperty("hibernate.use_sql_comments", this.deltaProperties.getCommentsSql());
properties.setProperty("hibernate.default_schema", this.deltaProperties.getDefautSchema());
properties.setProperty("hibernate.dialect", this.deltaProperties.getDialect());
properties.setProperty("hibernate.connection.provider_disables_autocommit",
this.deltaProperties.getDisablesAutoCommit());
properties.setProperty("org.hibernate.envers.audit_table_suffix", this.deltaProperties.getHisto());
properties.setProperty("hibernate.jdbc.time_zone", this.deltaProperties.getUtc());
properties.setProperty("hibernate.connection.autocommit", "false");
properties.setProperty("hibernate.transaction.jta.plateform", AtomikosJtaPlatform.class.getName());

factory.setJpaProperties(properties);
JpaDialect dialect = new HibernateJpaDialect();
factory.setJpaDialect(dialect);
return factory;
}
...
@Bean
@StepScope
public ItemReader<ResponseEntity> responseSendItemReader(
@Value("#{jobExecutionContext['num']}") String num) {
Map<String, Object> parameterValues;
parameterValues = new HashMap<>();
//parameter
parameterValues.put("num", num);
return new JpaPagingItemReaderBuilder<ResponseEntity>()
.name("responseSendItemReader")
.entityManagerFactory(this.entityManagerSecondaryFactory)
.queryString("SELECT r FROM ResponseEntity r "
+ " WHERE r.num = :num "
+ " ORDER BY r.dateEmission ASC")
.parameterValues(parameterValues)
.pageSize(25)
.build();
}

这是我的第二个基地的属性文件

spring.jta.atomikos.datasource.secondary.unique-resource-name=dataSourceSecondary
spring.jta.atomikos.datasource.secondary.max-pool-size=50
spring.jta.atomikos.datasource.secondary.min-pool-size=10
spring.jta.atomikos.datasource.secondary.max-life-time=0
spring.jta.atomikos.datasource.secondary.borrow-connection-timeout=10000
spring.jta.atomikos.datasource.secondary.defaultAutoCommit=false
spring.jta.atomikos.datasource.secondary.xa-data-source-class-name=@deltaie.ds.driver.class.name@
spring.jta.atomikos.datasource.secondary.xa-properties.url=url
spring.jta.atomikos.datasource.secondary.xa-properties.user=user
spring.jta.atomikos.datasource.secondary.xa-properties.password=pwd

这是我的transactions.properties文件:

# Atomikos properties
com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
com.atomikos.icatch.log_base_dir=./atomikos/log
com.atomikos.icatch.log_base_name=deltaieTransaction
com.atomikos.icatch.default_jta_timeout=10000
com.atomikos.icatch.oltp_retry_interval=10000
com.atomikos.icatch.checkpoint_interval=500
com.atomikos.icatch.max_timeout=300000
com.atomikos.icatch.recovery_delay=10000
com.atomikos.icatch.max_actives=50

这是我得到的错误

onReadError GunSendResponseStepListener java.lang.NullPointerException: null
at org.springframework.batch.item.database.JpaPagingItemReader.doReadPage(JpaPagingItemReader.java:192)
at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:110)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:93)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy300.read(Unknown Source)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99)
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:149)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

感谢您的帮助

如果有帮助的话,那是因为定义我的JpaPagingItemReader的函数返回了IteamReader口,而不是JpagingItemReader类。

由于此函数属于Step范围,因此建议使用JpaPagingItemReader类,而不是ItemReader接口,因为存在代理。

最新更新