嗨,我正试图使用Alpakka文档中显示的Producer api。我可以使用事务源和Producer来使用记录,但不能将消息放入主题无法在Alpakka中使用Transactional.Sink生成主题,但我看到幂等生成器已启用。我看到日志,它正在融入逻辑,但它没有生成myTopic 的事件
[info]o.a.k.c.p.KafkaProducer-[PProducer clientId=Producer-7fe8789c-3171-429e-afbf-d8a8ba12700c,transactionalId=7fe8789c-3171-429e afbf-d8ba12770c]幂等生产者已启用。
你能帮我理解为什么它可能不会产生主题的消息吗
我正在使用docker 在本地运行我的代码
下面是我的代码
``` Transactional.source(consumerSettings,
Subscriptions.topics(topicNames))
.mapMaterializedValue(innerControl = _)
.map(consumerRecord => {
handleBusiness(consumerRecord)
.flatMap(res => Source.single(res)
.runWith(Transactional.sink(producerSettings,
UUID.randomUUID().toString)))
})
}
source.runWith(Sink.ignore)
And my handleBusiness logics looks like below:
```
private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]] = {
(conversion of consumedMessage ) map { message =>
ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)
}
}```
我可以使用一个流来完成。事务源需要有一个像下面那样的Sink/Flow
Transactional.source(consumerSettings,
Subscriptions.topics(topicNames))
.mapMaterializedValue(innerControl = _)
.mapAsync(5) { msg => business(msg)}
.via(Transactional.flow(producersettings, transactions-id))