使用Python从Confluent Kafka Topic中消费数据并退出



我正在尝试编写一个python代码来消费来自Confluent Kafka主题的数据,并作为测试项目的一部分执行数据验证。我能够读取数据,但是消费进程处于无限循环中,并且如果循环读取所有消息,则寻找退出的决策点。

参见下面的示例代码

conf = {'bootstrap.servers': "server:port", #
'group.id': str(uuid.uuid1),
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',        
'session.timeout.ms': 6000
}
consumer = Consumer(conf) 
consumer.subscribe([topic], on_assign=on_assign)
try:
while True:
msg=consumer.poll(timeout=2.0)
# print(msg)
if msg is None:
print('msg is None')
continue
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
print( msg.error().code())
else:
print(msg.value())           
except KeyboardInterrupt:
print("Interrupted through kb")
finally:
consumer.close()

请建议最好的方法来决定是否所有的消息都被读取,以便我可以退出循环并关闭消费者。

根据定义,Apache Kafka主题是无界的事件流。没有"结束"。对于流,只能选择定义一个人工的结束。

您需要在应用程序逻辑中定义它。例如,如果您在超过<x>秒的时间内没有收到消息,则将其视为"结束",并停止消费。

看起来您缺少enable.partition.eof=True配置,该配置在到达分区结束时将发出错误。通过首先确定我正在使用的主题上存在哪些分区,我能够完成您的问题。然后,当我到达每个分区的末尾时,退出。

相关内容

  • 没有找到相关文章

最新更新