如果将 group_id 设置为 None,则 Kafka 使用者会收到消息,但如果不是 None,则不会收到任何消息?



我有下面的Kafka消费者,如果将group_id分配为None,它会很好地工作——它接收到所有历史消息和我新测试的消息。

consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:

但是,如果我将group_id设置为某个值,它就不会收到任何内容。我试图运行测试生成器来发送新消息,但没有收到任何消息。

消费者控制台确实显示以下消息:

2020-11-07 00:56:01 INFO线程池执行器0_0 base.py(Re-(加入组my_group2020-11-07 00:56:07 INFO ThreadPoolExecutiator-0_0-base.py成功加入第497代my_group组2020-11-07 00:56:07 INFO线程池执行器0_0-subscription_state.py更新的分区分配:[]2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 consumer.py为组my_group设置新分配的分区set((

主题的一个分区只能由同一ConsumerGroup中的一个使用者使用。

如果您不设置group.id,KafkaConsumer将为您生成一个新的随机group.id。由于这个group.id是唯一的,您将看到数据正在被消耗。

如果有多个消费者使用相同的group.id运行,那么只有一个消费者会读取数据,而另一个消费者则保持空闲,不消耗任何数据。

我知道,这不是作者问题的解决方案。尽管如此,如果你降落在这里,你可能会因为另一个原因遇到这个问题。和我一样。

因此,至少对于kafka-pythonv2.0.2和Aiven-kafka-broker设置,通过添加consumer.poll((的干式调用解决了这个问题。这特别奇怪,因为当没有对group_id进行asssigned时,这是不需要的。

输出自:

def get():
for message in consumer:
print(message.value)
consumer.commit()

这种情况下没有

而下面的工作如预期。它只读取来自上一个commit((的新消息:

输出自:

def get():
consumer.poll()
for message in consumer:
print(message.value)
consumer.commit()

它输出自上次提交以来该主题中的所有消息,正如预期的

JFYI,类构造函数如下所示:

consumer = KafkaConsumer(
topics,
bootstrap_servers=self._service_uri,
auto_offset_reset='earliest',
enable_auto_commit=False,
client_id='my_consumer_name',
group_id=self.GROUP_ID,
security_protocol="SSL",
ssl_cafile=self._ca_path,
ssl_certfile=self._cert_path,
ssl_keyfile=self._key_path,
)

\_(ツ)_/

相关内容

最新更新