我正在尝试将进入 RabbitMQ 队列的每条消息保存在数据库上,仅用于日志记录。之后,消息需要正常处理。
问题是:此队列具有配置了RetryOperationsInterceptor
的重试策略,并且每次处理消息时出现一些错误时,消息都会重新排队并再次处理。保存消息的逻辑位于读取队列的侦听器中,因此我3
(我配置的重试次数(不是只保存在数据库中的一条消息。
查看我的RetryOperationsInterceptor
:
@Bean
public RetryOperationsInterceptor defaultRetryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(2000, 2.0, 10000)
.build();
}
集装箱工厂:
@Bean(name = FACTORY_CONTAINER_NAME)
public SimpleRabbitListenerContainerFactory factoryQueueExample(ConnectionFactory connectionFactory,
RetryOperationsInterceptor defaultRetryOperationsInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(getMessageConverter());
factory.setDefaultRequeueRejected(false);
Advice[] adviceChain = {defaultRetryOperationsInterceptor};
factory.setAdviceChain(adviceChain);
return factory;
}
队列侦听器:
@Slf4j
@Component
@AllArgsConstructor
public class MessageListener {
private final MessageRepository messageRepository;
private final MessageService messageService;
@RabbitListener(queues = MessageConfiguration.QUEUE,
containerFactory = MessageConfiguration.FACTORY_CONTAINER_NAME)
public void process(SomeMessage someMessage) {
messageRepository.save(someMessage); // transform to a entity and save on database
messageService.process(someMessage); // process message
}
}
我不知道是否是相关信息,但这个队列也有一个相关的 DLQ。重试后,邮件将进入 DLQ 队列。
我的想法是在重试拦截器中找到一些可以在第一次尝试时调用服务的东西,只保存一次消息。
我也愿意接受解决此问题的其他想法,例如将尝试次数与消息一起保存,只是为了表明这不是保存在数据库中的重复消息,而是不同尝试中的相同消息。
请注意如何应用重试:
Advice[] adviceChain = {defaultRetryOperationsInterceptor};
factory.setAdviceChain(adviceChain);
与编写自己的MethodInterceptor
写入数据库的方式相同。 当您在该defaultRetryOperationsInterceptor
之前按顺序定义此自定义建议时,DB save 将仅调用一次,只是因为此类操作将在重试之外甚至之前发生。
有关 AOP 的更多信息,请参阅文档:https://docs.spring.io/spring/docs/5.0.6.RELEASE/spring-framework-reference/core.html#aop
首先,我要感谢Artem和Garry帮助我如何解决我的问题。
至少有四种解决方案:
Advice
解决方案(见加里的回答(- 柜台从
RetrySynchronizationManager
messageId
解决方案- 使用有状态
RetryOperationsInterceptor
RetrySynchronizationManager
选项中的计数器可以很容易地用于我的情况:
int attempts = RetrySynchronizationManager.getContext().getRetryCount();
boolean canSaveOnDatabase = attempts == 0;
messageId
选项对于发件人发送唯一标识该邮件的 ID 是必需的。因此,我可以在数据库中保存和使用这些信息,以了解消息是否已持久化。这可以由 MessagePostProcessor 在从RabbitTemplate
调用某些send
方法时进行配置。
最后一个选项,使用状态RetryOperationsInterceptor
我可以在RabbitListener
中查找标题@Header(AmqpHeaders.REDELIVERED) boolean redelivered
.如果您使用无状态RetryOperationsInterceptor
(我的情况(,则此选项不起作用,并且此选项还需要messageId
。