融合kafka消费者使用asyncio的问题



我正试图将asyncio功能集成到我的kafka主题监听器中,但遇到了一些问题(对于python中的异步编程来说,这是一个全新的问题(。

我创建了一个confluent-kafka consumer,它正在听一个主题。该主题经常有消息,性能至关重要(因此引入了异步io(。

main((函数如下所示:

def main(self):
while True:
try:
msg = consumer.poll(timeout=5.0)
if msg is None:
continue
asyncio.ensure_future(handle_message(message))
finally:
consumer.close()     

从本质上讲,我想以线性的方式将消息从主题中提取出来,但消息的处理应该是异步的。。。这意味着handle_message中发生的任何数据库I/O等都将被异步处理(我在该函数中正确设置了等待等(。问题是,我似乎从来没有在asyncio.ensure_future((中开始执行。当我从kafka主题中提取消息时,我如何不断地向异步循环添加任务使用confluent-kafka==1.5.0

一个问题是msg = consumer.poll(timeout=5.0)将阻塞事件循环。避免这种情况的一种方法是在concurrent.futures.ThreadPoolExecutor中运行它(应该在循环之前创建它,而不是为循环的每次迭代创建一个新的(。

另一个问题是,你没有处理数据输入速度快于处理速度的情况。我建议你考虑一些方法来限制正在运行的任务的数量。一种技术是将每个新创建的任务放入一个集合或类似的集合中。添加项目之前,请清除已完成任务的集合。然后检查大小,看看是否要启动另一项任务,如果不想,请等待一项任务完成。

最新更新