卡夫卡消费者无法阅读新消息



我对Kafka还比较陌生,我正试图在主题上发送消息后产生消费者。

单个生产者在不同的分区上发送200个消息。

consumer-1已经在运行,consumer-1正在侦听所有4个分区,几秒钟内,同一组中的另一个consumer(consumer-2(启动。然后Kafka触发组的重新平衡,但所有最初的200条消息都将流向消费者-1,新的消息将流向消费者-2。创建新消费者后,描述消费者组API show
Warning: Consumer group 'ner_group' is rebalancing.

有人能建议如何在消费者组中添加新的消费者,以阅读已经发送到某个主题的消息,而另一个消费者已经在阅读该主题吗?

下面包含消费者和生产者的配置。

生产者

producer = KafkaProducer(bootstrap_servers=[f'{ip}:9092'],
value_serializer=lambda x: 
dumps(x).encode('utf-8'))
for e in range(200):
data = {'count':e ,'time':time.time() }
producer.send('ner', value=data)

消费者

consumer = KafkaConsumer(
'ner',
bootstrap_servers=[f'{ip}:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='ner_group',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for message in consumer:
msg = message.value
time.sleep(1)

我多次运行使用者脚本。

所有最初的200条消息都将进入消费者-1,新消息将进入消费者-2

这可能是你正在经历的,但这不是保证的行为。

如何在消费者组中添加新的消费者,以读取已经发送到某个主题的消息,而另一个消费者已经在阅读该主题?

您所做的是正确的。添加另一个使用者不会读取已提交的组中的偏移

最新更新