管理不同线程中的 RabbitMQ 自动删除队列.蟒



我想知道这是否是管理不同线程中auto_delete队列的正确方法(主要用于测试我不希望 RabbitMQ 队列在连接关闭时停留的问题(

import pika
from threading import Thread
class ConsumerThread(Thread):
def __init__(self, callback, queue):
Thread.__init__(self)
self.setDaemon(True)
self.callback = callback
self.queue = queue
def run(self):
# stablish connection
connection = pika.BlockingConnection(pika.ConnectionParameters(CONNECTION['address'], CONNECTION['port'], CONNECTION['vhost'], CONNECTION['credentials']))
channel = connection.channel()
# create the auto-delete queue
channel.queue_declare(queue=self.queue, auto_delete=True)
# start consuming
channel.basic_qos(prefetch_count=1)
channel.basic_consume(self.callback, queue=self.queue)
channel.start_consuming()
class Factory:
def __init__(self):
self.queue_init = "init.queue"
self.queue_start = "start.queue"
threads = [ConsumerThread(self.init_callback, self.queue_init), ConsumerThread(self.start_callback, self.queue_start)]
for t in threads:
t.start()
def init_callback(self, ch, method, properties, body):
# doing something
def start_callback(self, ch, method, properties, body):
# doing something

RabbitMQ 团队监控rabbitmq-users邮件列表,并且只偶尔回答 StackOverflow 上的问题。


Pika 不是线程安全的。必须确保BlockingConnection方法调用发生在运行连接和通道的同一线程上。根据您的代码,我不确定这是否会发生,因为您在Factory类中调用回调,这似乎很奇怪。为什么不在ConsumerThread中使用这些方法呢?

Pika0.12及更高版本将包含一个add_callback_threadsafe方法,该方法将调度要在 ioloop 线程上执行的方法。

最新更新