出现异常时,RabbitMQ消息未发送到死信队列



我有这个RabbitMQ Spring Boot配置:

@Configuration
public class RabbitConfiguration {
// Main queue configuration
@Value("${rabbitmq.main.messages.queue}")
private String mainQueueName;
@Value("${rabbitmq.main.exchange.queue}")
private String mainExchangeName;
@Value("${rabbitmq.main.routing.key}")
private String mainRoutingKey;
// DLQ configuration
@Value("${rabbitmq.dlq.messages.queue}")
private String dlqQueueName;
@Value("${rabbitmq.dlq.exchange.queue}")
private String dlqExchangeName;
@Value("${rabbitmq.dlq.routing.key}")
private String dlqRoutingKey;
// Connectivity         
@Value("${spring.rabbitmq.host}")
private String rabbitmqHost;
@Value("${spring.rabbitmq.port}")
private int rabbitmqPort;
@Value("${spring.rabbitmq.username}")
private String rabbitmqUsername;
@Value("${spring.rabbitmq.password}")
private String rabbitmqPassword;
// Not delivered messages, will be used eventually
@Value("${rabbitmq.not.delivered.messages.queue}")
private String notDeliveredMessagesQueue;
// status with delivered messages (callback default)
@Value("${rabbitmq.delivered.messages.queue}")
private String deliveredMessagesQueue;

@Bean
DirectExchange deadLetterExchange() {
return new DirectExchange(dlqExchangeName);
}

@Bean
DirectExchange exchange() {
return new DirectExchange(mainExchangeName);
}   

@Bean
Queue dlq() {
return QueueBuilder.durable(dlqQueueName).build();
}
@Bean
Queue queue() {
return QueueBuilder
.durable(mainQueueName)
.withArgument("x-dead-letter-exchange", "deadLetterExchange")
.withArgument("x-dead-letter-routing-key", dlqQueueName).build();
}

@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with(dlqRoutingKey);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(mainQueueName);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}   

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitmqHost);
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin admin = new RabbitAdmin(connectionFactory());
admin.declareQueue(queue());
admin.declareExchange(exchange());
admin.declareBinding(binding());
return admin;
}

public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}   
}

问题是,当启动某个异常时,消息不会发送到DLQ队列。

消费者:

@RabbitListener(queues = { "${rabbitmq.main.messages.queue}" })
public void recievedMessage(@Payload Mensagem item, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InvalidMessageException {
if (item.getIdCliente().equals("69")) {
logger.info("Something went wrong to: " + item);
throw new InvalidMessageException();
} else {
logger.info("==> Message consumed successfully: " + item);
}
}

这是我在应用程序上的配置。属性:

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=3s
spring.rabbitmq.listener.simple.retry.max-attempts=2
spring.rabbitmq.listener.simple.retry.multiplier=2
spring.rabbitmq.listener.simple.retry.max-interval=10s

当我故意抛出异常只是为了看到消息移动到DLQ时,什么也不会发生。这里怎么了?我在这里忘了什么?

尝试添加

spring.rabbitmq.listener.simple.default-requeue-rejected=false

到你的application.properties。我认为问题是失败的交付被重新排队,而不是发送到DLQ。

最新更新