如何使用kafka-python从每个分区获取最新的偏移量



我正在尝试从给定主题的每个分区获取最新的偏移量(未提交的偏移量(。

from kafka import KafkaConsumer, TopicPartition
topic = 'test-topic'
broker = 'localhost:9092'
consumer = KafkaConsumer(bootstrap_servers=broker)
tp = TopicPartition(topic, 0)        #1
consumer.assign([tp])                #2
consumer.seek_to_end(tp)             #3
last_offset = consumer.position(tp)  #4
for i in consumer.partitions_for_topic(topic):
    tp = TopicPartition(topic, i)
    consumer.assign([tp])
    consumer.seek_to_end(tp)
    last_offset = consumer.position(tp)
    print(last_offset)

前面的代码确实有效并打印每个分区的偏移量。但是,请注意我在循环外和循环内都有相同的 4 行。如果我删除任何行 #1 - #4(直接在 for 循环前面的 4 行(,我会收到错误:文件 "check_kafka_offset.py",第 19 行,在 对于 i 在 consumer.partitions_for_topic(主题(:类型错误:"NoneType"对象不可迭代

为什么我需要在 for 循环之前有 4 行?

可以使用

该客户端中的 end_offsets(partitions) 函数获取指定分区的最后一个偏移量。请注意,返回的偏移量是下一个偏移量,即当前结束 +1。文档在这里。

编辑:示例实现:

from kafka import KafkaProducer, KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
BOOTSTRAP="""cluster:9092"""
API_KEY="""redacted"""
API_SECRET="""redacted"""
TOPIC="python-test"
consumer = KafkaConsumer(
    group_id="my-group",
    bootstrap_servers=[BOOTSTRAP],
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username=API_KEY,
    sasl_plain_password=API_SECRET,
    value_deserializer=lambda m: json.loads(m.decode('ascii')),
    auto_offset_reset='earliest'
)
PARTITIONS = []
for partition in consumer.partitions_for_topic(TOPIC):
    PARTITIONS.append(TopicPartition(TOPIC, partition))
    
end_offsets = consumer.end_offsets(PARTITIONS)
print(end_offsets)

end_offsets看起来像这样:

{TopicPartition(topic=u'python-test', partition=0): 5,
 TopicPartition(topic=u'python-test', partition=1): 20,
 TopicPartition(topic=u'python-test', partition=2): 0}

这是一个简单且文档齐全的函数:

from kafka import TopicPartition
def getTopicInfos(consumer, topic: str):
    """
    Get topic's informations like partitions with their last offsets.
    Example of result: {'topic': 'myTopic', 'partitions': ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']})
    - Parameters:
      consumer: A Kafka consumer.
      topic: A topic name.
    - Return:
      The topic's informations.
    """
    # Get topic-partition pairs
    # E.g: [TopicPartition(topic='myTopic', partition=0), TopicPartition(topic='myTopic', partition=1)]
    tp = [TopicPartition(topic, partition) for partition in consumer.partitions_for_topic(topic)]
    # Get last offsets
    # E.g: {TopicPartition(topic='myTopic', partition=0): 47, TopicPartition(topic='myTopic', partition=1): 98}
    tplo = consumer.end_offsets(tp)
    # Format partition-lastOffset pairs
    # E.g: ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']
    plo = ['{' + f'"partition": {item.partition}, "lastOffset": {tplo.get(item)}' + '}' for item in tplo]
    # Concat topic with partition-lastOffset pairs
    # E.g: {'topic': 'myTopic', 'partitions': ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']})
    tplo = {"topic": topic, "partitions": plo}
    # Return the result
    return tplo

最新更新