我已经阅读了关于这个主题的多个答案,我的配置似乎应该正常工作,但由于某些原因,它没有正常工作。
这是配置:
@Bean Queue intakeQueue(String name) { return new Queue(name, true); }
@Bean Exchange dlx(String name) { return new DirectExchange(name); }
@Bean Queue dlq(String name) { return new Queue(name, false, false, true); }
@Bean
Binding dlb(Exchange dlx, Queue dlq, Queue reply) {
return BindingBuilder.bind(dlq).to(dlx).with(reply.getName()).noargs();
}
@Bean
Queue replyQueue(String name, Exchange dlx) {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", dlx.getName());
args.put("x-dead-letter-routing-key", name);
return new Queue(name, true, false, false, args);
}
RabbitMQ UI显示回复队列具有DLX
和DLK
属性。
我发送类似的信息
this.rabbit.convertSendAndReceive(intakeQueue, obj, message -> {
message.getMessageProperties().setPriority(10);
return message;
});
消息处理程序在收到消息后立即抛出AmqpRejectAndDontRequeueException
。这只是为了测试。我从重试建议开始,但由于它并没有产生任何结果,我简化了测试用例。
public Object handleMessage(Object obj) throws IOException {
throw new AmqpRejectAndDontRequeueException("Testing retries!");
}
我现在看到两个问题:
- 抛出ARADRE后,DLQ中从未显示消息。但是,如果我直接从
handleMessage
发布到DLQ,则会发生这种情况 convertSendAndReceive
没有收到任何回复(可能是异常?),一直等到超时发生,在我的情况下是5分钟。这可能是有意的,但对于RPC风格的调用来说,这相当奇怪
我有没有遗漏或误解什么?
从这个意义上说,它不是RPC;在侦听器中引发异常将不会传播回发送方。
您拒绝intakeQueue
上的传递,消息将路由到其DLX/DLQ(如果已配置)。
对于您的测试用例,您应该查看那个DLQ(如果有),而不是回复队列的DLQ。
RabbitMQ对队列关系一无所知。您不显示容器或RabbitTemplate
配置,但默认情况下,使用直接replyTo路由(使用"特殊"内部队列)。
您可以将rabbit模板配置为使用固定的回复队列,但随后必须提供一个回复侦听器容器,如文档中所述。
如果发件人在收到回复时超时,模板会抛出一个ARADRE
,因此在这种情况下,回复将是死信。
为了测试这一点,在你的倾听者中睡眠超过超时时间,然后回复;然后,您应该会看到回复转到回复队列的DLQ。
如果要将异常传播给调用者,则需要将其作为handleMessage
返回值返回。
目前还没有逻辑来重新抛出这样的异常,但是,您必须在代码中执行此操作(检测回复是否为异常并重新抛出)。
当然,异常类型必须是Serializable
。
您还可以在AMQP上使用SpringRemoting,将负责传播异常。