我正在尝试实现一个简单的Producer-->卡夫卡-->Kafka Shell中的消费者应用程序。我能够成功地生成和使用消息,但问题发生在我重新启动消费者时,每次重新启动消费者的旧消息都会被拾取。有没有什么方法可以避免在我重新启动消费者时失败的消息只会被接收
从shell。。。意思是kafka-console-consumer
?如果是:
-
不要经常使用
--from-beginning
-
添加
--group
参数以跟踪已消耗的内容
值得一提的是,默认行为至少是一次交付,因此应该预期重复,并且您需要编写一个不同的消费者来适应幂等/事务生产者