使用命令行在kafka中消费消息时如何设置组名的任何想法。
我尝试使用以下命令:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF2_P2 --from-beginning --config group.id=test1
'config' is not a recognized option
目标是使用以下命令查找已消耗消息的偏移量:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test1
有人能在这方面帮忙吗!!
提前感谢!!
最简单的解决方案是:
bin/kafka-console-consumer.sh--动物园管理员localhost:2181--主题nil_RF2_P2--从头开始--消费者属性组id=test1
如果您指定标志--从开始,请记住使用者组过去不应该消费任何记录,否则您的使用者将从指定组最早未消费的记录开始消费(而不是从实际开始消费,因为您可能错误地假设)。
从命令提示符中得到了更改组名的答案!!
步骤:
- 创建一个新的
consumer.properties
文件,比如consumer1.properties
- 改变CCD_ 4中的CCD_
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer.config config/consumer1.properties --delete-consumer-offsets
您可以像这样使用--group选项(使用Kafka 2.0.0测试):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group test-consumer --topic test --from-beginning
如果您想在不丢失记录偏移量的情况下更改组id,您必须手动获取当前group.id的偏移量,并将其设置为具有新id的新运行使用者。如果在使用者实例中无法控制获取偏移量,您可以运行此命令。
/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server <ip_address>:<Broker_port> --group Group_name --describe
然后您可以从特定偏移量中查找数据。请注意,您应该呼叫"搜索"后再呼叫"轮询"分配命令无效。你也可以在github 中看到我的代码示例
此处的示例
如果您正在使用bash,那么您可以使用它的进程替换功能。
bin/kafka-console-consumer.sh --zookeeper localhost:2181
--topic nil_RF2_P2 --from-beginning
--consumer.config <(echo group.id=test1)