Confluent-Kafka Python -描述消费者组(以获得每个消费者组的延迟)



我想使用confluent-kafka获取消费者组的详细信息。与之等价的cli是'

./kafka-consumer-groups.sh --bootstrap-server XXXXXXXXX:9092 --describe --group my-group

我的最终目标是从输出中获得滞后值。在confluent-kafka python API中是否有任何方法来获取这些细节?在java API中有一个方法,但是我在python API中找不到它。

我尝试在adminClient API中使用descripbe_configs方法,但它最终抛出了kafkaException与以下详细信息

这很可能是由于客户端库的请求格式错误或消息被发送到不兼容的代理而发生的。查看代理日志了解更多详细信息。

目前我已经提出了以下解决方案。这是一种获得消费者群体综合滞后的方法

def get_lag(topic,numPartitions):
diff = list()
for i in range(numPartitions):
topic_partition = TopicPartition(topic, partition=i)
low, high = consumer.get_watermark_offsets(topic_partition)
currentList = consumer.committed([topic_partition])
current = currentList[0].offset
diff.append(high-current)
return sum(diff) # Combined Lag

最新更新