RabbitMQ基本.获取和确认



我正在调用:

GetResponse response = channel.basicGet("some.queue", false); // no auto-ack
....
channel.basicAck(deliveryTag, ...);

然而,当我调用basicGet时,队列中的消息保持在"Ready"状态,而不是"unconfirmed"状态。我希望它们处于未确认状态,以便我可以basic.ack它们(从而从队列中丢弃它们),或者basic.nack它们

我正在做以下操作来模拟延迟ack:

消耗时间

  1. 从初始队列获取(消费)消息。
  2. 创建一个"PendingAck_123456"队列
    123456是消息的唯一id。
    设置以下属性
    • x-message-ttlli>x-expires(确保临时队列将被删除)
    • x-dead-letter-exchangex-deal-letter-routing-key to requue toTTL到期时的初始队列.
  3. 将消息Pending ack发布到这个"PendingAck_123456"队列
  4. Ack消息以将其从初始队列中删除

在确认时间

  1. 从消息Id计算队列名称,并从"PendingAck_123456"队列中获取
  2. 确认(不需要调用.getBody())。
    这将从这个挂起队列中删除它,防止TTL重新请求它

评论

  • 只有1条消息的队列。如果有很多这样的队列,这是一个问题吗?一个被请求的消息将在队列输入端被发送。而不是队列输出(就像真正的ack一样)…对消息顺序有影响。
  • 消息被应用程序复制到等待队列。这是一个额外的步骤,可能对整体性能有影响。
  • 要模拟Ack/Reject,您可能需要将消息复制到Initial Queue,并从PendingAck队列中Ack它。默认情况下,TTL会(稍后)执行此操作。

get之后立即做ack,它工作得很好。然而,在我的情况下,它们被一个请求分开了。spring的模板在每次执行时关闭通道和连接。所以有三个选项:

  • 在应用程序的整个生命周期中保持一个通道和连接打开
  • 有某种对话范围(或最坏情况:使用会话)来存储相同的通道并重用它。
  • 为每个请求使用一个通道,立即确认接收,并将消息存储在内存中。

在前两种情况下,你不能用spring的RabbitTemplate

最新更新