从卡夫卡消费者创建新的生产者



如何使用java从现有Consumer创建新的Kafka Producer?

您不能从KafkaConsumer实例创建KafkaProducer。

您必须使用与制作人相同的连接设置来显式创建KafkaProducer。

考虑到您提到的用例(将数据从一个主题复制到另一个主题(,我建议使用Kafka Streams。事实上,卡夫卡中有一个例子就是这样做的:https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java

我建议使用Kafka Streams库。它从卡夫卡主题中读取数据,并进行一些处理并写回另一个主题。

对你来说,这可能是一种更简单的方法。https://kafka.apache.org/documentation/streams/

目前的限制是,源和目标集群应该与Kafka Streams相同。否则,您需要使用处理器API来定义另一个目标集群。

另一种方法是简单地在消费者程序中定义生产者。只要您的规则匹配(基于偏移量或任何条件(,请调用producer.send()方法

最新更新