在Apache Kafka主题(python)中消费最后一条消息不工作



我一直在尝试使用以下方法(Kafka python version 2.0.2)获取Kafka主题中的最新消息:

consumer = KafkaConsumer('topic', bootstrap_servers =['localhost:9092'],
value_deserializer = lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset ='latest',enable_auto_commit=False,
consumer_timeout_ms=2000)

然后使用:

poll = consumer.poll(timeout_ms=2000, max_records=1)
consumer.seek_to_end()
messages = []
temp = []
msgs = []
[temp.extend(i) for i in poll.values()]
[ msgs.extend([i]) for i in temp ]
for message in msgs:
message_data = {}
message_data['partition'] = message.partition
message_data['offset'] = message.offset
message_data['key'] = message.key
message_data['value'] = message.value
message_data['processed'] = False
print('Message data: '+str(message_data))
messages.append(message_data)
print(messages)

然而,我得到一个空数组(消息)。你能提供一些帮助吗?

你正在寻找最后…如果您想要最后一条消息,则需要向后移动一个偏移量,可以使用

之类的内容。
offset = consumer.position(partition)  # TODO: loop over each partition from consumer.assignment()
consumer.seek(partition, offset - 1)

注意:可能需要-2,因为position返回下一个记录以消耗

最新更新