Asyncio和RabbitMQ:如何知道是否有更多的消息等待RabbitMQ



我如何知道在回调方法中,如果asyncio从RabbitMQ收到了更多的消息,留给我处理?

我同时从RabbitMQ接收一批消息。我需要处理它们,然后在最有可能收到批处理后将消息发送到另一个服务。我可以用异步计时器来做这件事,但检查asyncio是否有来自RabbitMQ的另一条消息会更简单。

我目前正在使用aio-pika库。代码看起来像这样:

async def subscribe_to_messages():
connection = await aio_pika.connect_robust(
host=host, port=port, virtualhost=virtualhost, login=login, password=password
)
channel = await connection.channel()
queue = await channel.declare_queue(queue_name, auto_delete=True)
await queue.bind(exchange=queue_ex_name, routing_key=queue_routing_key)
await queue.consume(callback=process_message, consumer_tag='my_tag')    

async def process_message(msg): 
# Process the message
# How do I know here if asyncio already has received another message for me? 
# Once I've received all messages for now, I will need to 
# send a message to another service.

根据文档,仅仅调用queue.consume就已经为每个接收到的消息调用了一次回调函数-无需担心。

您只需要编写一个函数来处理单个消息的内容-如果有其他消息等待,它将立即在下一个消息中再次调用-

尝试从消息内部检查队列,不仅会使您的函数复杂化,因为您可能会在双重处理某些消息时遇到许多极端情况(例如,您的函数从队列中读取下一个消息,处理它,返回并使用它刚刚处理的消息再次调用)

https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/6-rpc.html callback-queue

(注意代码注释说&;start processing queue&;)

等待queue. consumer;调用基本上不会返回,直到队列被某些外部事件关闭或发生异常。

最新更新