我有一个新的消费者列表(Python消费者)。我可以使用此命令检索组:
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
我可以为每个主题获得与
连接的主题bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group TheFoundGroupId
- 如何获得与主题连接的所有组(即使不在组中,也是所有的消费者)?
- 除了将其作为shell命令运行外,是否有一种方法可以从Python访问?
感谢您问这个问题。
所有消费者配置(例如消费者组ID),将其订阅哪个主题存储在Zookeeper中。
在命令下运行以连接到Zookeeper
./bin/zookeeper-shell localhost:2181
然后运行
LS/消费者
您将获得存在的所有消费者群体。如果您不提供消费者组。Kafka将分配随机消费者组。对于控制台消费者,它将分配控制台 - 消费者-XXXXX ID
您可以从python片段下方获得所有消费者组
安装Zookeeper Python客户端
from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
# get all consumer groups
consumer_groups = zk.get_children("/consumers")
print("There are %s consumer group(s) with names %s" % (len(consumer_groups), consumer_groups))
# get all consumers in group
for consumer_group in consumer_groups:
consumers = zk.get_children("/consumers/"+consumer_group)
print("There are %s consumers in %s consumer group. consumer are : %s" % (len(consumers), consumer_group, consumers))
让与主题连接的消费者或消费者群体。
获取/消费者/消费者group_id/ids/cumputer_id/
将为您提供像
这样的输出{"version":1,"subscription":{"test":1},"pattern":"white_list","timestamp":"1514218381246"}
在订阅对象下,消费者订阅的所有主题。根据您的用例实现逻辑
谢谢
这不是最好的解决方案,但是由于没有人似乎有答案,这是我最终解决的方法(将一个组分配给消费者并替换___ yourgroup ____):
): import subprocess
import os
if "KAFKA_HOME" in os.environ:
kafkapath = os.environ["KAFKA_HOME"]
else:
kafkapath = oms_cfg.kafka_home
# error("Please set up $KAFKA_HOME environment variable")
# exit(-1)
instances = []
# cmd = kafkapath + '/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server {} --list'.format(oms_cfg.bootstrap_servers)
# result = subprocess.run(cmd.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
igr = ____YOURGROUP_____ # or run over all groups from the commented out command
print("Checking topics of consumer group {}".format(igr))
topic_cmd = kafkapath + '/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server ' + oms_cfg.bootstrap_servers + ' --describe --group {gr}'
result = subprocess.run(topic_cmd.format(gr=igr).split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
table = result.stdout.split(b'n')
# You could add a loop over topic here
for iline in table[1:]:
iline = iline.split()
if not len(iline):
continue
topic = iline[0]
# we could check here for the topic. multiple consumers in same group -> only one will connect to each topic
# if topic != oms_cfg.topic_in:
# continue
client = iline[-1]
instances.append(tuple([client, topic]))
# print("Client {} Topic {} is fine".format(client, topic))
if len(instances):
error("Cannot start. There are currently {} instances running. Client/topic {}".format(len(instances),
instances))
exit(-1)