事务同步:如何使用Reactor Kafka和R2DBC创建ChainedKafkaTransactionManager



在我的Spring Boot(WebFlux/R2DBC/Rector Kafka(应用程序中,我有一个如下的使用者

@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
kafkaReceiver
.receive()
.doOnNext { record ->
val myEvent = record.value()
myService.deleteSomethingFromDbById(myEvent.myId)
.thenEmpty {
record.receiverOffset().acknowledge()
}.subscribe()
}
.subscribe()
}

我想为Kafka和DB事务添加事务同步。阅读文档和一些stakoverflow问题后

  • 使用Database+kafka示例的春季启动中的事务同步
  • Spring Kafka ChainedKafkaTransactionManager没有';t与JPA Spring数据事务同步
  • Spring Kafka中的事务同步
  • Spring@Transactional具有跨多个数据源的事务

似乎ChainedKafkaTransactionManager将是一条路。

但以下代码不能像ChainedKafkaTransactionManager所期望的PlatformTransactionManager类型的事务管理器那样工作。因此不接受参数r2dbcTransactionManager

@Bean(name = ["chainedTransactionManager"])
fun chainedTransactionManager(
r2dbcTransactionManager: R2dbcTransactionManager,
kafkaTransactionManager: KafkaTransactionManager<*, *>
) = ChainedKafkaTransactionManager(kafkaTransactionManager, r2dbcTransactionManager)

有其他方法可以做到这一点吗?

对于Kafka消费者来说,链接交易毫无意义。仅适用于发布者,即传出消息。

但您应该确保不要多次处理同一条消息。

@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
kafkaReceiver.receive()
// Make sure to have unique index on (topic, partition, offset)
// so you receive a ConstraintViolationException
.flatMap { r ->
val msg = ConsumedMessage(r.topic(), r.partition(), r.offset())
consumedMessagesRepository.save(msg).thenReturn(r)
}
.onErrorContinue {ex, r -> log.warn("Duplicate msg") }
.flatMap { r ->
myService.deleteSomethingFromDbById(r.value().myId)
.thenReturn(r)
}
.flatMap { r ->
r.receiverOffset().commit()
}
.subscribe()
}

最新更新