Spring AMQP 如何在使用 @SendTo 发送时连接丢失时访问响应消息



我们有一个微服务,它使用@RabbitListener使用消息并将数据持久化到数据库中,在成功处理消息时生成响应,并使用@sendTO将其发送到不同的队列进行审核。

在 HA 故障转移中运行 Rabbit 时,在连接丢失时发送响应时,当前正在处理的消息将正确返回到队列,但数据库事务(在本例中为 jpa 事务(未回滚,永远不会发送响应。

我从这个问题(https://github.com/spring-projects/spring-amqp/issues/696(中读到,这是"尽力而为的1PC"事务同步;RabbitMQ 不支持 XA 事务。兔子 tx 是在 DB tx 之后提交的,DB tx 可能会提交并且兔子回滚;您必须处理重复消息的微小可能性。

但是在我们的例子中,当我们重试请求时,我们将其视为重复消息,并且永远不会为此请求创建响应。 有没有办法让我们只能在连接丢失异常的情况下重试发送响应消息,而不是再次重新处理请求?我查看了ConditionalRejectingErrorHandler.DefaultExceptionStrategy,它只能访问原始请求,无法访问连接失败期间丢失的响应。请建议处理此问题的最佳方法是什么?

我们的代码如下所示:

SpringBootApplication
@EnableJpaRepositories("com.***")
@EnableJpaAuditing
@EnableTransactionManagement
@EnableEncryptableProperties
public class PcaClinicalValidationApplication {
@RabbitListener(queues = "myqueue"
@SendTo("exchange/routingKey")
@Timed) description = "Time taken to process a request")
public Message receivemessage(HashMap<String, Object> myMap, Message requestMessage)
throws Exception {
//business logic goes here
Message message = MessageBuilder.fromMessage(requestMessage)
//add some headers
return message;
}
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setRetryTemplate(new RetryTemplate());
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);          
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
//wrote to a file          
serializer.serialize(failed);
return null;
});
return factory;
}

侦听器容器工厂在其replyTemplate属性中使用RabbitTemplate- 用于发送回复。

您可以在该RabbitTemplate中配置RetryTemplate以重试发送答复。

重试次数用尽后,您可以添加一个RecoveryCallback该该将获得失败的回复,并且可以将其保存在某个地方并在重新传递发生时使用它。

最新更新