如何在Kafka中的命令行中获取主题的group.id



我在服务器上安装了kafka,想学习如何使用它,我发现了一个由scala编写的示例代码,下面是它的一部分,

def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
    val props = new Properties()
    props.put("zookeeper.connect", zookeeper)
    props.put("group.id", groupId)
    props.put("auto.offset.reset", "largest")
    props.put("zookeeper.session.timeout.ms", "400")
    props.put("zookeeper.sync.time.ms", "200")
    props.put("auto.commit.interval.ms", "1000")
    val config = new ConsumerConfig(props)
    config
}

但我不知道如何在我的服务器上找到组id。

group id是您通过为其提供字符串id来为消费者定义的东西。所有以相同id开头的消费者都将以协调的方式"合作"并读取主题,其中每个消费者实例将处理主题中的一个子集消息。提供不存在的组id将被视为新的使用者,并在Zookeeper中创建一个新条目,在该条目中存储已提交的偏移量。

您可以获得一个Zookeeper shell并列出Kafka存储消费者偏移量的路径,如下所示:

./bin/zookeeper-shell.sh localhost:2181
ls /consumers

您将获得所有组的列表。

编辑:我错过了你说你自己设置的部分,所以我认为你想列出现有集群的消费者组
Lundahl是对的,这是您定义的一个属性,用于协调使用者线程,使它们不会消耗"彼此"的消息(每个线程消耗一个子集)。例如,如果您使用两个具有不同组的使用者,他们将各自使用整个主题。

/kafkadir/kafka-consumer-groups.sh--所有主题--引导服务器主机名:端口--列出

最新更新