我正在尝试从给定主题的每个分区获取最新的偏移量(未提交的偏移量(。
from kafka import KafkaConsumer, TopicPartition
topic = 'test-topic'
broker = 'localhost:9092'
consumer = KafkaConsumer(bootstrap_servers=broker)
tp = TopicPartition(topic, 0) #1
consumer.assign([tp]) #2
consumer.seek_to_end(tp) #3
last_offset = consumer.position(tp) #4
for i in consumer.partitions_for_topic(topic):
tp = TopicPartition(topic, i)
consumer.assign([tp])
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
print(last_offset)
前面的代码确实有效并打印每个分区的偏移量。但是,请注意我在循环外和循环内都有相同的 4 行。如果我删除任何行 #1 - #4(直接在 for 循环前面的 4 行(,我会收到错误:文件 "check_kafka_offset.py",第 19 行,在 对于 i 在 consumer.partitions_for_topic(主题(:类型错误:"NoneType"对象不可迭代
为什么我需要在 for 循环之前有 4 行?
该客户端中的 end_offsets(partitions)
函数获取指定分区的最后一个偏移量。请注意,返回的偏移量是下一个偏移量,即当前结束 +1。文档在这里。
编辑:示例实现:
from kafka import KafkaProducer, KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
BOOTSTRAP="""cluster:9092"""
API_KEY="""redacted"""
API_SECRET="""redacted"""
TOPIC="python-test"
consumer = KafkaConsumer(
group_id="my-group",
bootstrap_servers=[BOOTSTRAP],
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username=API_KEY,
sasl_plain_password=API_SECRET,
value_deserializer=lambda m: json.loads(m.decode('ascii')),
auto_offset_reset='earliest'
)
PARTITIONS = []
for partition in consumer.partitions_for_topic(TOPIC):
PARTITIONS.append(TopicPartition(TOPIC, partition))
end_offsets = consumer.end_offsets(PARTITIONS)
print(end_offsets)
end_offsets
看起来像这样:
{TopicPartition(topic=u'python-test', partition=0): 5,
TopicPartition(topic=u'python-test', partition=1): 20,
TopicPartition(topic=u'python-test', partition=2): 0}
这是一个简单且文档齐全的函数:
from kafka import TopicPartition
def getTopicInfos(consumer, topic: str):
"""
Get topic's informations like partitions with their last offsets.
Example of result: {'topic': 'myTopic', 'partitions': ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']})
- Parameters:
consumer: A Kafka consumer.
topic: A topic name.
- Return:
The topic's informations.
"""
# Get topic-partition pairs
# E.g: [TopicPartition(topic='myTopic', partition=0), TopicPartition(topic='myTopic', partition=1)]
tp = [TopicPartition(topic, partition) for partition in consumer.partitions_for_topic(topic)]
# Get last offsets
# E.g: {TopicPartition(topic='myTopic', partition=0): 47, TopicPartition(topic='myTopic', partition=1): 98}
tplo = consumer.end_offsets(tp)
# Format partition-lastOffset pairs
# E.g: ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']
plo = ['{' + f'"partition": {item.partition}, "lastOffset": {tplo.get(item)}' + '}' for item in tplo]
# Concat topic with partition-lastOffset pairs
# E.g: {'topic': 'myTopic', 'partitions': ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']})
tplo = {"topic": topic, "partitions": plo}
# Return the result
return tplo