kafka-python:消费者对象可以在不订阅任何主题的情况下加入消费者组吗



我有一个topics_subscriber.py,它会不断检查并订阅任何新主题。

def _subscribe_topics(consumer: KafkaConsumer) -> None:
topics_to_subscribe = {
topic
for topic in consumer.topics()
if topic.startswith("topic-")
}
subscribed_topics = consumer.subscription()
print(subscribed_topics)
new_topics = (
topics_to_subscribe - subscribed_topics
if subscribed_topics
else topics_to_subscribe
)
if new_topics:
print("new topics detected:")
for topic in new_topics:
print(topic)
print("nsubscribing to old+new topicsn")
consumer.subscribe(topics=list(topics_to_subscribe)) // subscribe() is not incremental, hence subscribing to old+new topics 
print(consumer.subscription())
else:
print("nno new topics detected: exiting normallyn")

def main() -> None:
consumer = kafka.KafkaConsumer(
client_id="client_1",
group_id="my_group",
bootstrap_servers=['localhost:9092']
)
while True:
_subscribe_topics(consumer)
print("nsleeping 10 secn")
time.sleep(10)

现在,在另一个脚本kafka_extractor.py中,我想创建一个新的使用者并加入my_group使用者组,并开始使用该组订阅的主题中的消息。即没有专门订阅这个新消费者的主题。

def main() -> None:
consumer = kafka.KafkaConsumer(
client_id="client_2",
group_id="my_group",
bootstrap_servers=['localhost:9092']
)
print("created consumer")
print(consumer.subscription())
for msg in consumer:
print(msg.topic)

kafka_extractor.py输出中需要注意的两件事:

  1. print(consumer.subscription())输出为None
  2. for msg in consumer:->被卡住,既不前进也不退出程序

如有任何指示,请点击此处。

将使用者加入群组不会获得群组的现有主题,因为群组可以包含多个主题,使用者实例通过订阅来决定要使用哪些主题

你的循环被卡住了,因为它默认从订阅主题的末尾读取,但没有

如果您想使用定期刷新的主题列表,请使用regex订阅

相关内容