我们有一个微服务,它使用@RabbitListener使用消息并将数据持久化到数据库中,在成功处理消息时生成响应,并使用@sendTO将其发送到不同的队列进行审核。
在 HA 故障转移中运行 Rabbit 时,在连接丢失时发送响应时,当前正在处理的消息将正确返回到队列,但数据库事务(在本例中为 jpa 事务(未回滚,永远不会发送响应。
我从这个问题(https://github.com/spring-projects/spring-amqp/issues/696(中读到,这是"尽力而为的1PC"事务同步;RabbitMQ 不支持 XA 事务。兔子 tx 是在 DB tx 之后提交的,DB tx 可能会提交并且兔子回滚;您必须处理重复消息的微小可能性。
但是在我们的例子中,当我们重试请求时,我们将其视为重复消息,并且永远不会为此请求创建响应。 有没有办法让我们只能在连接丢失异常的情况下重试发送响应消息,而不是再次重新处理请求?我查看了ConditionalRejectingErrorHandler.DefaultExceptionStrategy,它只能访问原始请求,无法访问连接失败期间丢失的响应。请建议处理此问题的最佳方法是什么?
我们的代码如下所示:
SpringBootApplication
@EnableJpaRepositories("com.***")
@EnableJpaAuditing
@EnableTransactionManagement
@EnableEncryptableProperties
public class PcaClinicalValidationApplication {
@RabbitListener(queues = "myqueue"
@SendTo("exchange/routingKey")
@Timed) description = "Time taken to process a request")
public Message receivemessage(HashMap<String, Object> myMap, Message requestMessage)
throws Exception {
//business logic goes here
Message message = MessageBuilder.fromMessage(requestMessage)
//add some headers
return message;
}
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setRetryTemplate(new RetryTemplate());
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
//wrote to a file
serializer.serialize(failed);
return null;
});
return factory;
}
侦听器容器工厂在其replyTemplate
属性中使用RabbitTemplate
- 用于发送回复。
您可以在该RabbitTemplate
中配置RetryTemplate
以重试发送答复。
重试次数用尽后,您可以添加一个RecoveryCallback
该该将获得失败的回复,并且可以将其保存在某个地方并在重新传递发生时使用它。