Running RabbitMQ Pika with Quart



我正在使用Quart框架,但我也需要使用RabbitMQ皮卡连接器,但我不能让他们玩得很好,因为他们都有无限循环。

入口点:

from quart import Quart
from .service import Service
app = Quart(__name__)
@app.before_serving
async def startup():
app.service_task = asyncio.ensure_future(service.start())
if not service.initialise():
sys.exit()

服务类:

class Service:
def __init__(self, new_instance):
self._connection = None
self._channel = None
self._messaging_thread = None
def initialise(self):
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('localhost', credentials=credentials)
self._connection = pika.BlockingConnection(parameters)
self._channel = self._connection.channel()
self._channel.queue_declare(queue='to_be_processed_queue')
self._channel.basic_consume(queue='to_be_processed_queue',
auto_ack=True,
on_message_callback=self.callback)

print('creating thread')
self._messaging_thread = Thread(target=self.run_consume())
#self._messaging_thread.start()
print('Thread created...')
def run_consume(self):
try:
self._channel.start_consuming()
except KeyboardInterrupt:
self._shutdown()

代码甚至没有到达print('Thread created…'),我不明白。从这个问题我明白RabbitMQ不是线程安全的,但我不明白如何运行RabbitMQ。

Pika不是线程安全的,正如你已经发现的,但这不是你的程序阻塞的原因。

您的问题可能在这里:

print('creating thread')
self._messaging_thread = Thread(target=self.run_consume())
#self._messaging_thread.start()

如果你从run_consume中去掉括号,效果会更好吗?现在你实际上不是在创建一个线程,而是在现场执行self.run_consume(),并且它没有退出。

self._messaging_thread = Thread(target=self.run_consume)

将是我的第一次尝试。

然而,由于Pika不是线程安全的,您还必须移动您的通道创建&东西到你的线程,而不是在主程序中这样做。如果你不在其他地方使用它,它可能会工作,但正确的方法是将所有内容包含在线程中,并且不像你现在这样在线程之间共享任何Pika结构。

最新更新