如何在RabbitMQ中重新排队消息



消费者收到消息后,消费者/工作人员进行一些验证,然后调用web服务。在这个阶段,如果出现任何错误或验证失败,我们希望将消息放回最初使用的队列。

我已经阅读了RabbitMQ文档。但我对拒绝、nack和取消方法之间的区别感到困惑。

简短回答:

要重新排队特定的消息,您可以选择basic.rejectbasic.nackmultiple标志设置为false。

如果您正在使用消息确认,并且在特定时间消费者上有未确认的消息,并且消费者在未确认的情况下退出,则basic.consume调用也可能导致消息重新传递。

basic.recover将在特定信道上重新传送所有未确认的消息。

长答案:

basic.rejectbasic.nack都用于相同的目的——丢弃或重新排队特定消费者无法处理的消息(在给定时刻,在特定条件下或根本无法处理(。它们之间的主要区别在于basic.nack支持批量消息处理,而basic.reject不支持。

RabbitMQ官方网站上的否定确认文章中描述了这种差异:

AMQP规范定义了basic.reject方法,该方法允许客户端拒绝单个传递的消息,指示代理丢弃或重新排队。不幸的是,basic.reject不支持批量否定确认消息。

为了解决这个问题,RabbitMQ支持basic.nack方法,该方法提供了basic.reject的所有功能,同时还允许批量处理消息

为了批量拒绝消息,客户端将basic.nack方法的multiple标志设置为true。然后,代理将拒绝所有未确认、已传递的消息,直到并包括basic.nack方法的delivery_tag字段中指定的消息。在这方面,basic.nack补充了basic.ack的批量确认语义。

注意,basic.nack方法是RabbitMQ特定的扩展,而basic.reject方法是AMQP 0.9.1规范的一部分。

对于basic.cancel方法,它用于通知服务器客户端停止消息消费。注意,该客户端可以在basic.cancel方法发送和接收cancel-ok回复之间接收任意消息编号。如果客户端使用消息确认,并且它有任何未确认的消息,则它们将被移回最初使用的队列。

basic.recover在RabbitMQ中有一些局限性:它-基本.recover with requeue=false-基本覆盖同步

除了勘误表之外,根据RabbitMQ规范,basic.recover还提供部分支持(不支持requee=false的恢复。(

关于basic.consume:的注意事项

basic.consume在没有自动确认的情况下启动时(no­ack=false(,并且存在一些未确认的挂起消息时,当消费者被取消(死亡、致命错误、异常等(时,挂起消息将被重新传递。从技术上讲,在消费者发布(ack/nack/reject/recover(之前,不会处理这些挂起的消息(甚至是死信(。只有在那之后,它们才会被处理(例如,死信(。

例如,假设我们最初连续发布5条消息:

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)

然后消费其中的3个,但不确认,然后取消消费者。我们将遇到这种情况:

Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)

其中星号(*(注意到redelivered标志被设置为true

假设我们有死信交换集和死信消息队列的情况

Exchange(e-main)                                   Exchange(e-dead) 
  Queue(main){x-dead-letter-exchange: "e-dead"}       Queue(dead) 

假设我们发布了expire属性设置为5000(5秒(的5条消息:

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
Queue(dead) (tail) { }(head)

然后我们从main队列中消耗3条消息,并将它们保存10秒:

Queue(main) (tail) { [2!] [1!] [0!] } (head)
Queue(dead) (tail) { [4*] [3*] } (head)

其中感叹号(!(代表未确认的消息。这样的消息不能传递给任何消费者,通常也不能在管理面板中查看。但让我们取消消费者,记住,它仍然持有3条未确认的消息:

Queue(main) (tail) { } (head)
Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)

所以现在,头上的3条消息被放回了原始队列,但由于它们设置了每条消息的TTL,所以它们被死信发送到死信队列的尾部(当然,通过死信交换(。

p.S.:

消费消息(即侦听新消息(与直接队列访问(在不照顾其他消息的情况下获取一条或多条消息(有某种不同。详见basic.get方法说明。

相关内容

  • 没有找到相关文章

最新更新