如何使用命令行在kafka中消费消息时设置组名称



使用命令行在kafka中消费消息时如何设置组名的任何想法。

我尝试使用以下命令:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF2_P2 --from-beginning --config group.id=test1
'config' is not a recognized option

目标是使用以下命令查找已消耗消息的偏移量:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test1

有人能在这方面帮忙吗!!

提前感谢!!

最简单的解决方案是:

bin/kafka-console-consumer.sh--动物园管理员localhost:2181--主题nil_RF2_P2--从头开始--消费者属性组id=test1

如果您指定标志--从开始,请记住使用者组过去不应该消费任何记录,否则您的使用者将从指定组最早未消费的记录开始消费(而不是从实际开始消费,因为您可能错误地假设)。

从命令提示符中得到了更改组名的答案!!

步骤:

  1. 创建一个新的consumer.properties文件,比如consumer1.properties
  2. 改变CCD_ 4中的CCD_
  3. bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer.config config/consumer1.properties --delete-consumer-offsets

您可以像这样使用--group选项(使用Kafka 2.0.0测试):

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group test-consumer --topic test --from-beginning

如果您想在不丢失记录偏移量的情况下更改组id,您必须手动获取当前group.id的偏移量,并将其设置为具有新id的新运行使用者。如果在使用者实例中无法控制获取偏移量,您可以运行此命令。

/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server <ip_address>:<Broker_port>  --group Group_name --describe

然后您可以从特定偏移量中查找数据。请注意,您应该呼叫"搜索"后再呼叫"轮询"分配命令无效。你也可以在github 中看到我的代码示例

此处的示例

如果您正在使用bash,那么您可以使用它的进程替换功能。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
--topic nil_RF2_P2 --from-beginning 
--consumer.config <(echo group.id=test1)

最新更新