在python kafka消费者中获取CommitFailedError



我已经用以下配置在python中创建了Kafka消费者。

consumer = KafkaConsumer(topic, 
group_id='consumer', 
bootstrap_servers=[bootstrap_servers], 
auto_offset_reset='latest', 
value_deserializer=lambda m:json.loads(m.decode('utf-8')), 
max_poll_records=1, 
max_poll_interval_ms=900000)

每条记录的处理时间约为10分钟,小于max_poll_interval_msmax_poll_interval_ms值,当consumer.commit()被调用时,我得到以下异常

kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the rebalance timeout with max_poll_interval_ms,
or by reducing the maximum size of batches returned in poll()
with max_poll_records

我不知道为什么它失败了,有人可以帮助这里吗?

您能确认在什么时间点进行补偿吗?你是否使用任何一个提交api(同步/异步),或者你只是设置自动提交属性,如enable.auto.commit = true auto. Commit .interval.ms。因为,这些参数是决定消费者提交成功或失败的关键参数。此外,了解session.timeout.ms的行为和问题的解决方案也很有帮助。

session.timeout.ms

消费者在静止状态下与代理失去联系的时间默认为3秒。如果超过session.timeout.ms通过如果消费者不向组协调器发送心跳,则认为是这样死亡和群体协调者将触发消费群体的再平衡将已死消费者的分区分配给组中的其他消费者。这属性与心跳,间隔,毫秒密切相关。heartbeat.interval.ms欺诈检测KafkaConsumer poll()方法发送一次心跳的频率组协调器,而session.timeout.ms控制消费者可以使用多长时间不要发出心跳信号。因此,这两个性质通常是模态的配置在一起- heat .interval.ms必须低于session.timeout。女士,通常设置为超时值的三分之一。如果session。timeout。ms等于3秒心跳间隔,毫秒应该是1秒。设置session.timeout.ms低于默认值将允许消费者组检测故障并从故障中恢复更快,但也可能导致不必要的再平衡,因为消费者拿走了完成轮询循环或垃圾收集的时间更长。设置session.timeout.ms更高将减少意外再平衡的机会,但也意味着它将需要更长的时间来检测真正的故障。

最新更新