无法在Alpakka中使用Transactional.Ssink向Kafka主题生成消息,但我看到幂等生成器已启用



嗨,我正试图使用Alpakka文档中显示的Producer api。我可以使用事务源和Producer来使用记录,但不能将消息放入主题无法在Alpakka中使用Transactional.Sink生成主题,但我看到幂等生成器已启用。我看到日志,它正在融入逻辑,但它没有生成myTopic 的事件

[info]o.a.k.c.p.KafkaProducer-[PProducer clientId=Producer-7fe8789c-3171-429e-afbf-d8a8ba12700c,transactionalId=7fe8789c-3171-429e afbf-d8ba12770c]幂等生产者已启用。

你能帮我理解为什么它可能不会产生主题的消息吗

我正在使用docker 在本地运行我的代码

下面是我的代码

``` Transactional.source(consumerSettings,
Subscriptions.topics(topicNames))
.mapMaterializedValue(innerControl = _)
.map(consumerRecord => {
handleBusiness(consumerRecord)
.flatMap(res => Source.single(res)
.runWith(Transactional.sink(producerSettings, 
UUID.randomUUID().toString)))
})
}
source.runWith(Sink.ignore)

And my handleBusiness logics looks like below:
```
private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]]  = {
(conversion of consumedMessage ) map { message =>
ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)
}

}```

我可以使用一个流来完成。事务源需要有一个像下面那样的Sink/Flow

Transactional.source(consumerSettings,
Subscriptions.topics(topicNames))
.mapMaterializedValue(innerControl = _)
.mapAsync(5) { msg => business(msg)}
.via(Transactional.flow(producersettings, transactions-id))

相关内容

  • 没有找到相关文章

最新更新