我有下面的Kafka消费者,如果将group_id
分配为None,它会很好地工作——它接收到所有历史消息和我新测试的消息。
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:
但是,如果我将group_id
设置为某个值,它就不会收到任何内容。我试图运行测试生成器来发送新消息,但没有收到任何消息。
消费者控制台确实显示以下消息:
2020-11-07 00:56:01 INFO线程池执行器0_0 base.py(Re-(加入组my_group2020-11-07 00:56:07 INFO ThreadPoolExecutiator-0_0-base.py成功加入第497代my_group组2020-11-07 00:56:07 INFO线程池执行器0_0-subscription_state.py更新的分区分配:[]2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 consumer.py为组my_group设置新分配的分区set((
主题的一个分区只能由同一ConsumerGroup中的一个使用者使用。
如果您不设置group.id,KafkaConsumer将为您生成一个新的随机group.id。由于这个group.id是唯一的,您将看到数据正在被消耗。
如果有多个消费者使用相同的group.id运行,那么只有一个消费者会读取数据,而另一个消费者则保持空闲,不消耗任何数据。
我知道,这不是作者问题的解决方案。尽管如此,如果你降落在这里,你可能会因为另一个原因遇到这个问题。和我一样。
因此,至少对于kafka-pythonv2.0.2和Aiven-kafka-broker设置,通过添加consumer.poll((的干式调用解决了这个问题。这特别奇怪,因为当没有对group_id进行asssigned时,这是不需要的。
输出自:
def get():
for message in consumer:
print(message.value)
consumer.commit()
这种情况下没有
而下面的工作如预期。它只读取来自上一个commit((的新消息:
输出自:
def get():
consumer.poll()
for message in consumer:
print(message.value)
consumer.commit()
它输出自上次提交以来该主题中的所有消息,正如预期的
JFYI,类构造函数如下所示:
consumer = KafkaConsumer(
topics,
bootstrap_servers=self._service_uri,
auto_offset_reset='earliest',
enable_auto_commit=False,
client_id='my_consumer_name',
group_id=self.GROUP_ID,
security_protocol="SSL",
ssl_cafile=self._ca_path,
ssl_certfile=self._cert_path,
ssl_keyfile=self._key_path,
)
\_(ツ)_/