如何从连接本身以外的其他通道恢复未确认的 AMQP 消息?



似乎我保持我的rabbitmq服务器运行的时间越长,我有更多的麻烦与未确认的消息。我想重新排队。实际上,似乎有一个amqp命令可以做到这一点,但它只适用于您的连接正在使用的通道。我构建了一个小的皮卡脚本,至少尝试一下,但我要么错过了一些东西,要么不能这样做(如何与rabbitmqctl?)

import pika
credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,
    credentials=credentials, virtual_host='***')
def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body
def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    
def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    
try:
    connection = pika.SelectConnection(parameters=parameters,
        on_open_callback=on_connected)    
    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

未确认的消息是那些已经通过网络传递给消费者但尚未被回复或拒绝的消息,但该消费者尚未关闭最初接收它们的通道或连接。因此,代理无法确定消费者只是花了很长时间来处理这些消息,还是已经忘记了这些消息。因此,它使它们处于未确认状态,直到消费者死亡或它们被返回或拒绝。

由于这些消息在将来仍然可以由最初消费它们的仍然存活的消费者有效地处理,因此(据我所知)您不能将另一个消费者插入到混合中并试图对它们做出外部决策。您需要修改您的消费者在处理每条消息时做出决定,而不是留下未确认的旧消息。

如果消息被解除锁定,只有两种方法可以让它们重新进入队列:

  1. basic.nack

    此命令将导致消息被放回队列并重新发送。

  2. 断开与代理的连接

  3. 此操作将强制将此通道中所有未锁定的消息放回队列。

注意:基本。Recover将尝试在同一通道上(对同一消费者)重新发布未解密的消息,这有时是期望的行为。

RabbitMQ基本规范。Recover和basic.nack


真正的问题是:为什么消息未被确认?

可能导致未锁定消息的场景:

  1. 消费者获取了太多的消息,然后没有足够快地处理和打包它们。

  2. 有bug的客户端库(我目前在pika 0.9.13有这个问题。如果队列有很多消息,那么一定数量的消息将被卡住,甚至在几个小时后。

    解决方案:我必须重新启动消费者几次,直到所有未锁定的消息都从队列中消失。

一旦所有工人/消费者停止,所有未确认的消息将进入就绪状态。

通过在ps aux输出上确认grep,并在发现时停止/杀死所有工人来确保所有工人停止。

如果你正在使用supervisor管理worker,它显示worker已停止,你可能需要检查是否有僵尸。Supervisor报告worker将被停止,但是当在ps - aux输出上执行grepped时,您仍然会发现僵尸进程正在运行。杀死僵尸进程将使消息回到就绪状态。

最新更新