有没有更好的方法来处理队列中从on_message回调收到的消息



我目前有一个paho MQTT客户端脚本,它带有一个消息队列和一个on_message回调函数。

def on_message(self, client, _, message):
message = message.payload.decode()
self.messageQueue.put_nowait(message)

我需要对这些收到的消息进行一些处理(我每3秒钟在队列中收到大约12条消息(

如何安全地处理这些消息?

如果您想要一个健壮的分布式队列机制,您可以使用python-celene来实现,其中您的on_message将充当将任务放置在broker上的生产者。多个工作人员/消费者将在不同的机器上运行,并使用来自代理的任务。

链接:http://docs.celeryproject.org/en/latest/getting-started/introduction.html

希望这能有所帮助!

最新更新