可以使用RabbitMQ通道在一个Python线程中完成消费和发布吗?
根本不是问题,例如pika
您可以非常轻松地做到这一点,但是问题在于您必须停止消费,因为它是一个阻塞循环或在消息消费期间进行生成。
消费和生产是一个正常的用例,尤其是在pika中,因为它不是线程安全的,例如,当您想要对消息实现某种形式的过滤器时,或者,也许是一个智能路由器,这反过来会将消息传递到另一个队列。
我认为你不应该想要。 MQ 表示 异步处理。 在我看来,在同一线程中进行消费和生产违背了目的。
我建议看看Celery(http://celery.readthedocs.org/en/latest/)来管理工人任务。 这样,您无需直接与 RMQ 集成,因为它将为您处理生产和消费。
但是,如果您确实希望直接与 RMQ 集成并管理您自己的工人,请查看 Kombu (http://kombu.readthedocs.org/en/latest/) 进行集成。 有非阻塞使用者和生产者,可以让你在同一事件循环中同时使用两者。
我认为你的问题的简单答案是肯定的。 但这取决于你想做什么。 我的猜测是你有一个循环,它从一个通道上的线程消耗,经过一些(小或大)处理后,它决定将其发送到另一个通道上的另一个队列(或交换),那么我完全看不出任何问题。 尽管最好将其调度到其他线程,但这不是必需的。
如果您提供有关过程的更多详细信息,那么它可能有助于给出更具体的答案。
Kombu是一个常见的python库,用于处理RabbitMQ(Celery在引擎盖下使用它)。 这里值得指出的是,对于我尝试过的最简单的 Kombu 用法,您的问题的答案是"不 - 您不能在同一个消费者回调线程上接收和发布。"
具体来说,如果队列中有几条消息供消费者使用,该消费者已为该主题注册了回调,并且该回调执行了一些处理并发布结果,则结果的发布将导致队列中的第二条消息在从第一条消息的发布返回之前命中回调 - 因此您最终会以递归调用回调。 如果队列中有 n 条消息,则调用堆栈在展开之前将结束 n 条消息。 显然,这很快就会爆炸。
一种解决方案(不一定是最好的)是让回调将消息发布到消费者内部的简单队列中,该队列可以在主进程线程上处理(即关闭回调线程)
def process_message(self, body: str, message: Message):
# Queue the message for processing off this thread:
print("Start process_message ----------------")
self.do_process_message(body, message) if self.publish_on_callback else self.queue.put((body, message))#
print("End process_message ------------------")
def do_process_message(self, body: str, message: Message):
# Deserialize and "Process" the message:
print(f"Process message: {body}")
# ... msg processing code...
# Publish a processing output:
processing_output = self.get_processing_output()
print(f"Publishing processing output: {processing_output}")
self.rabbit_msg_transport.publish(Topics.ProcessingOutputs, processing_output)
# Acknowledge the message:
message.ack()
def run_message_loop(self):
while True:
print("Waiting for incoming message")
self.rabbit_connection.drain_events()
while not self.queue.empty():
body, message = self.queue.get(block=False)
self.do_process_message(body, message)
在上面的这个片段中process_message是回调。 如果publish_on_callback为 True,您将在兔子队列上的 n 个消息的回调 n deep 中看到递归。 如果publish_on_callback为 False,则它可以正常运行,而不会在回调中递归。
另一种方法是为生产者交换使用第二个连接 - 与用于消费者的连接分开。 这也使使用消息和发布结果的回调在再次为队列上的下一条消息触发回调之前完成。