spring Kafka与Database/MQ交互的事务管理



我们正在尝试在我们的spring Kafka消费者中实现事务管理。

我们有Kafka消费者从主题A ->数据库更新/插入->生成主题b上的Kafka消息

我面临的问题是,当数据库事务提交失败,发送操作主题B不是回滚。因此系统处于不一致状态。

其他场景均按预期工作。

例如:

  1. 读取kafka ->Db相互作用->kafka send:如果kafka send失败,db txn不提交,读取消息的偏移量不提交。

  2. 读取kafka ->Db相互作用->kafka send:如果消费消息的偏移量提交失败,kafka send和db不提交。

PS:我知道kafka不支持XA事务。我确实看到了一些提到使用ChaintedTransactionManager的资源,根据文档,它已经从spring data core 2.5版本弃用了,所以最好不要使用它。如有任何建议,欢迎。

我把代码片段放在-

下面主:

@KafkaHandler
@Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
public void receiveCreationMessage(String event){

// saves into database
dao.saveDraft(event);

// sends kafka message
sendMQAdapterKafkaEvent(initiateReqMq, convertedEvent);

}

@Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
public void saveDraft(@NonNull String event) {

entityManager.joinTransaction();
entityManager.persist(event);
}

@Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
public void sendMQAdapterKafkaEvent(String mqName, Object message) throws 

kafkaTemplate.send(topicName, msgKey, message)
}

Application.properties

spring.kafka.producer.transaction-id-prefix=txnId-
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.properties.transactional-id=trans-id-

Kafka监听器容器工厂:

@Bean(KafkaConstants.ContainerFactoryNames.MANUAL_COMMIT_CONTAINER_FACTORY)
@Autowired()
public ConcurrentKafkaListenerContainerFactory<Object, Object> manualCommitKafkaListenerContainerFactory(
@NonNull final ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
@NonNull final KafkaProperties kafkaProperties,
@NonNull final RecordMessageConverter converter,
@NonNull final ErrorHandler errorHandler,
@NotNull final RetryTemplate retryTemplate,
@NotNull @Qualifier("kafkaTxM") @Lazy final KafkaTransactionManager kafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
val consumerProperties = kafkaProperties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
val consumerFactory = new DefaultKafkaConsumerFactory<Object, Object>(consumerProperties);

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 0L)));
factory.setAfterRollbackProcessor(new TestRollbackProcesor(errorHandler, kafkaTransactionManager, new FixedBackOff(0L, 0L)));
configurer.configure(factory, consumerFactory);
return factory;
}

Kafka producer factory -

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(final KafkaProperties kafkaProperties) throws IOException {
val factory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
val transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
factory.transactionCapable();
return factory;
}

Kafka tx manager bean -

@Bean(name = "kafkaTxM")
public KafkaTransactionManager kafkaTransactionManager(final MyConfiguration myConfig,
final KafkaProperties kafkaProperties) throws IOException {
KafkaTransactionManager ktm = new KafkaTransactionManager(kafkaProducerFactory(myConfig, kafkaProperties));;
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}

日志——

13:20:19.317 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {TOPICNAME=OffsetAndMetadata{offset=1904, leaderEpoch=null, metadata=''}}
13:20:19.318 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@53e77e17] sendOffsetsToTransaction({TOPICNAME=OffsetAndMetadata{offset=1904, leaderEpoch=null, metadata=''}}, GroupMetadata(groupId = CG.MANAGER-dev-blue1, generationId = 11, memberId = C.ABC_XYZ_MANAGER-1d309d57-0245-417e-bb94-5e49fe811257-0-971960cf-6bc4-4aa0-bf46-eb15900e1abf, groupInstanceId = ))
13:20:19.491 [kafka-producer-network-thread | P.ABC_XYZ_MANAGER-e57fc82a-77bc-4490-87cb-46e7b584746c-1] INFO  o.a.k.c.p.i.TransactionManager - [Producer clientId=P.ABC_XYZ_MANAGER-e57fc82a-77bc-4490-87cb-46e7b584746c-1, transactionalId=DraftMgr-CG.MANAGER-dev-blue1.TOPICNAME.0] Discovered group coordinator ivapp12230670573.devin1.ms.com:9093 (id: 3 rack: null)
13:20:20.285 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.t.i.TransactionInterceptor - Completing transaction for [com.ms.wmbanking.draftmanager.kafka.DraftAwaitingSaveListener.receiveCreationMessage]
13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.k.l.a.RecordMessagingMessageListenerAdapter - Listener method returned result [InvocationResult [result=null, sendTo=null, messageReturnType=false]] - generating response message for it
13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.k.l.a.RecordMessagingMessageListenerAdapter - No replyTopic to handle the reply: null
13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.t.KafkaTransactionManager - Triggering beforeCommit synchronization
13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.t.KafkaTransactionManager - Triggering beforeCompletion synchronization
13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.t.s.TransactionSynchronizationManager - Removed value [org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization@4f6f2a3e] for key [SessionImpl(322929867<open>)] from thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.k.t.KafkaTransactionManager - Initiating transaction commit
13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@53e77e17] commitTransaction()
13:20:20.369 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.t.KafkaTransactionManager - Triggering afterCommit synchronization
13:20:20.706 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN  o.h.e.jdbc.spi.SqlExceptionHelper - SQL Error: -302, SQLState: 22001
13:20:20.707 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.h.e.jdbc.spi.SqlExceptionHelper - DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=4.13.127
13:20:20.797 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.t.s.TransactionSynchronizationManager - Clearing transaction synchronization
13:20:20.797 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.t.KafkaTransactionManager - Triggering afterCompletion synchronization
13:20:20.797 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.t.s.TransactionSynchronizationManager - Removed value [org.springframework.kafka.core.KafkaResourceHolder@16feb510] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@5f113675] from thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
13:20:20.798 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@53e77e17] close(PT5S)
13:20:20.798 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Transaction rolled back
org.springframework.orm.jpa.JpaSystemException: Error while committing the transaction; nested exception is javax.persistence.RollbackException: Error while committing the transaction
at org.springframework.orm.jpa.EntityManagerFactoryUtils.convertJpaAccessExceptionIfPossible(EntityManagerFactoryUtils.java:408)
at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.convertException(ExtendedEntityManagerCreator.java:508)
at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.afterCommit(ExtendedEntityManagerCreator.java:480)
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:136)
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:124)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:945)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:782)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1839)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1811)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.persistence.RollbackException: Error while committing the transaction
at org.hibernate.internal.ExceptionConverterImpl.convertCommitException(ExceptionConverterImpl.java:81)
at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:104)
at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.afterCommit(ExtendedEntityManagerCreator.java:477)
... 14 common frames omitted
Caused by: javax.persistence.PersistenceException: org.hibernate.exception.DataException: could not execute statement
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:154)
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:181)
at org.hibernate.internal.ExceptionConverterImpl.convertCommitException(ExceptionConverterImpl.java:65)
... 16 common frames omitted
Caused by: org.hibernate.exception.DataException: could not execute statement
at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:52)
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:99)
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:200)
at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3302)
at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3829)
at org.hibernate.action.internal.EntityInsertAction.execute(EntityInsertAction.java:107)
at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604)
at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475)
at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:345)
at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40)
at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:93)
at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362)
at org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:453)
at org.hibernate.internal.SessionImpl.flushBeforeTransactionCompletion(SessionImpl.java:3212)
at org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2380)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:447)
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:183)
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$300(JdbcResourceLocalTransactionCoordinatorImpl.java:40)
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:281)
at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:101)
... 15 common frames omitted
Caused by: com.ibm.db2.jcc.am.SqlDataException: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=4.13.127
at com.ibm.db2.jcc.am.id.a(id.java:669)
at com.ibm.db2.jcc.am.id.a(id.java:60)
at com.ibm.db2.jcc.am.id.a(id.java:127)
at com.ibm.db2.jcc.am.no.b(no.java:2310)
at com.ibm.db2.jcc.am.no.c(no.java:2293)
at com.ibm.db2.jcc.t4.cb.l(cb.java:370)
at com.ibm.db2.jcc.t4.cb.a(cb.java:62)
at com.ibm.db2.jcc.t4.q.a(q.java:50)
at com.ibm.db2.jcc.t4.tb.b(tb.java:220)
at com.ibm.db2.jcc.am.oo.oc(oo.java:3428)
at com.ibm.db2.jcc.am.oo.b(oo.java:4383)
at com.ibm.db2.jcc.am.oo.b(oo.java:4554)
at com.ibm.db2.jcc.am.oo.gc(oo.java:784)
at com.ibm.db2.jcc.am.oo.executeUpdate(oo.java:763)
at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:136)
at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:136)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at net.ttddyy.dsproxy.proxy.StatementProxyLogic.performQueryExecutionListener(StatementProxyLogic.java:316)
at net.ttddyy.dsproxy.proxy.StatementProxyLogic.access$700(StatementProxyLogic.java:37)
at net.ttddyy.dsproxy.proxy.StatementProxyLogic$1.execute(StatementProxyLogic.java:123)
at net.ttddyy.dsproxy.listener.MethodExecutionListenerUtils.invoke(MethodExecutionListenerUtils.java:42)
at net.ttddyy.dsproxy.proxy.StatementProxyLogic.invoke(StatementProxyLogic.java:120)
at net.ttddyy.dsproxy.proxy.jdk.PreparedStatementInvocationHandler.invoke(PreparedStatementInvocationHandler.java:37)
at com.sun.proxy.$Proxy303.executeUpdate(Unknown Source)
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:197)
... 34 common frames omitted
13:20:20.801 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR c.m.w.m.kafka.TestRollbackProcesor - Backoff none exhausted for ConsumerRecord(topic = TOPICNAME, partition = 0, leaderEpoch = 16, offset = 1903, CreateTime = 1633938613586, serialized key size = -1, serialized value size = 2228, headers = RecordHeaders(headers = [RecordHeader(key = singularityheader, value = [110, 111, 116, 120, 100, 101, 116, 101, 99, 116, 61, 116, 114, 117, 101, 42, 99, 116, 114, 108, 103, 117, 105, 100, 61, 49, 54, 51, 51, 55, 55, 50, 55, 57, 52, 42, 97, 112, 112, 73, 100, 61, 55, 52, 50, 42, 110, 111, 100, 101, 105, 100, 61, 49, 50, 53, 50, 49, 48])], isReadOnly = false), key = null, value = {
"DATA":"ASJHNFHJJHGJHHJK"
})
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.orm.jpa.JpaSystemException: Error while committing the transaction; nested exception is javax.persistence.RollbackException: Error while committing the transaction
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1865)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1811)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.orm.jpa.JpaSystemException: Error while committing the transaction; nested exception is javax.persistence.RollbackException: Error while committing the transaction
at org.springframework.orm.jpa.EntityManagerFactoryUtils.convertJpaAccessExceptionIfPossible(EntityManagerFactoryUtils.java:408)
at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.convertException(ExtendedEntityManagerCreator.java:508)
at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.afterCommit(ExtendedEntityManagerCreator.java:480)
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:136)
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:124)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:945)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:782)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1839)
... 7 common frames omitted
Caused by: javax.persistence.RollbackException: Error while committing the transaction
at org.hibernate.internal.ExceptionConverterImpl.convertCommitException(ExceptionConverterImpl.java:81)
at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:104)
at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.afterCommit(ExtendedEntityManagerCreator.java:477)
... 14 common frames omitted
Caused by: javax.persistence.PersistenceException: org.hibernate.exception.DataException: could not execute statement
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:154)
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:181)
at org.hibernate.internal.ExceptionConverterImpl.convertCommitException(ExceptionConverterImpl.java:65)
... 16 common frames omitted
Caused by: org.hibernate.exception.DataException: could not execute statement
at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:52)
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:99)
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:200)
at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3302)
at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3829)
at org.hibernate.action.internal.EntityInsertAction.execute(EntityInsertAction.java:107)
at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604)
at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475)
at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:345)
at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40)
at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:93)
at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362)
at org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:453)
at org.hibernate.internal.SessionImpl.flushBeforeTransactionCompletion(SessionImpl.java:3212)
at org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2380)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:447)
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:183)
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$300(JdbcResourceLocalTransactionCoordinatorImpl.java:40)
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:281)
at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:101)
... 15 common frames omitted
Caused by: com.ibm.db2.jcc.am.SqlDataException: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=4.13.127
at com.ibm.db2.jcc.am.id.a(id.java:669)
at com.ibm.db2.jcc.am.id.a(id.java:60)
at com.ibm.db2.jcc.am.id.a(id.java:127)
at com.ibm.db2.jcc.am.no.b(no.java:2310)
at com.ibm.db2.jcc.am.no.c(no.java:2293)
at com.ibm.db2.jcc.t4.cb.l(cb.java:370)
at com.ibm.db2.jcc.t4.cb.a(cb.java:62)
at com.ibm.db2.jcc.t4.q.a(q.java:50)
at com.ibm.db2.jcc.t4.tb.b(tb.java:220)
at com.ibm.db2.jcc.am.oo.oc(oo.java:3428)
at com.ibm.db2.jcc.am.oo.b(oo.java:4383)
at com.ibm.db2.jcc.am.oo.b(oo.java:4554)
at com.ibm.db2.jcc.am.oo.gc(oo.java:784)
at com.ibm.db2.jcc.am.oo.executeUpdate(oo.java:763)
at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:136)
at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:136)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at net.ttddyy.dsproxy.proxy.StatementProxyLogic.performQueryExecutionListener(StatementProxyLogic.java:316)
at net.ttddyy.dsproxy.proxy.StatementProxyLogic.access$700(StatementProxyLogic.java:37)
at net.ttddyy.dsproxy.proxy.StatementProxyLogic$1.execute(StatementProxyLogic.java:123)
at net.ttddyy.dsproxy.listener.MethodExecutionListenerUtils.invoke(MethodExecutionListenerUtils.java:42)
at net.ttddyy.dsproxy.proxy.StatementProxyLogic.invoke(StatementProxyLogic.java:120)
at net.ttddyy.dsproxy.proxy.jdk.PreparedStatementInvocationHandler.invoke(PreparedStatementInvocationHandler.java:37)
at com.sun.proxy.$Proxy303.executeUpdate(Unknown Source)
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:197)
... 34 common frames omitted

13:20:20.798 [org.springframework.kafka. net]KafkaListenerEndpointContainer#0-0-C-1] ERROR . s.k.l.kafkamessagelistenercontainer $ListenerConsumer事务回滚

Kafka事务确实回滚了。

可能您的用户正在使用默认的isolation.level(read_uncommitted)。

对于Kafka,生产者记录总是被写入日志,后面跟着一个标记块,指示事务是被提交还是回滚。

消费者必须有isolation.levelread_committed才能跳过回滚记录。

最新更新