在我的Spring Boot(WebFlux/R2DBC/Rector Kafka(应用程序中,我有一个如下的使用者
@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
kafkaReceiver
.receive()
.doOnNext { record ->
val myEvent = record.value()
myService.deleteSomethingFromDbById(myEvent.myId)
.thenEmpty {
record.receiverOffset().acknowledge()
}.subscribe()
}
.subscribe()
}
我想为Kafka和DB事务添加事务同步。阅读文档和一些stakoverflow问题后
- 使用Database+kafka示例的春季启动中的事务同步
- Spring Kafka ChainedKafkaTransactionManager没有';t与JPA Spring数据事务同步
- Spring Kafka中的事务同步
- Spring@Transactional具有跨多个数据源的事务
似乎ChainedKafkaTransactionManager
将是一条路。
但以下代码不能像ChainedKafkaTransactionManager所期望的PlatformTransactionManager
类型的事务管理器那样工作。因此不接受参数r2dbcTransactionManager
。
@Bean(name = ["chainedTransactionManager"])
fun chainedTransactionManager(
r2dbcTransactionManager: R2dbcTransactionManager,
kafkaTransactionManager: KafkaTransactionManager<*, *>
) = ChainedKafkaTransactionManager(kafkaTransactionManager, r2dbcTransactionManager)
有其他方法可以做到这一点吗?
对于Kafka消费者来说,链接交易毫无意义。仅适用于发布者,即传出消息。
但您应该确保不要多次处理同一条消息。
@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
kafkaReceiver.receive()
// Make sure to have unique index on (topic, partition, offset)
// so you receive a ConstraintViolationException
.flatMap { r ->
val msg = ConsumedMessage(r.topic(), r.partition(), r.offset())
consumedMessagesRepository.save(msg).thenReturn(r)
}
.onErrorContinue {ex, r -> log.warn("Duplicate msg") }
.flatMap { r ->
myService.deleteSomethingFromDbById(r.value().myId)
.thenReturn(r)
}
.flatMap { r ->
r.receiverOffset().commit()
}
.subscribe()
}