我目前有一个paho MQTT客户端脚本,它带有一个消息队列和一个on_message回调函数。
def on_message(self, client, _, message):
message = message.payload.decode()
self.messageQueue.put_nowait(message)
我需要对这些收到的消息进行一些处理(我每3秒钟在队列中收到大约12条消息(
如何安全地处理这些消息?
如果您想要一个健壮的分布式队列机制,您可以使用python-celene来实现,其中您的on_message将充当将任务放置在broker上的生产者。多个工作人员/消费者将在不同的机器上运行,并使用来自代理的任务。
链接:http://docs.celeryproject.org/en/latest/getting-started/introduction.html
希望这能有所帮助!