我正在调用:
GetResponse response = channel.basicGet("some.queue", false); // no auto-ack
....
channel.basicAck(deliveryTag, ...);
然而,当我调用basicGet
时,队列中的消息保持在"Ready"状态,而不是"unconfirmed"状态。我希望它们处于未确认状态,以便我可以basic.ack
它们(从而从队列中丢弃它们),或者basic.nack
它们
我正在做以下操作来模拟延迟ack:
消耗时间
- 从初始队列获取(消费)消息。
- 创建一个"PendingAck_123456"队列
123456是消息的唯一id。
设置以下属性- x-message-ttlli>x-expires(确保临时队列将被删除)
- x-dead-letter-exchange和x-deal-letter-routing-key to requue toTTL到期时的初始队列.
- 将消息Pending ack发布到这个"PendingAck_123456"队列
- Ack消息以将其从初始队列中删除
在确认时间
- 从消息Id计算队列名称,并从"PendingAck_123456"队列中获取
- 确认(不需要调用
.getBody()
)。
这将从这个挂起队列中删除它,防止TTL重新请求它
评论
- 只有1条消息的队列。如果有很多这样的队列,这是一个问题吗?一个被请求的消息将在队列输入端被发送。而不是队列输出(就像真正的ack一样)…对消息顺序有影响。
- 消息被应用程序复制到等待队列。这是一个额外的步骤,可能对整体性能有影响。
- 要模拟Ack/Reject,您可能需要将消息复制到Initial Queue,并从PendingAck队列中Ack它。默认情况下,TTL会(稍后)执行此操作。
在get
之后立即做ack
,它工作得很好。然而,在我的情况下,它们被一个请求分开了。spring的模板在每次执行时关闭通道和连接。所以有三个选项:
- 在应用程序的整个生命周期中保持一个通道和连接打开
- 有某种对话范围(或最坏情况:使用会话)来存储相同的通道并重用它。
- 为每个请求使用一个通道,立即确认接收,并将消息存储在内存中。
在前两种情况下,你不能用spring的RabbitTemplate