Spring RabbitMQ:一个@RabbitListener多个队列,具有指定队列的优先级



我有两个队列-PRIORITY_QUEUESIMPLE_QUEUESIMPLE_QUEUE很大,其中的消息数量达到数千条,PRIORITY_QUEUE可以为空,有时只接收少量消息。当PRIORITY_QUEUE中有消息时,我需要确保来自SIMPLE_QUEUE的消息不会被读取,并且如果优先级队列中有消息,则阻止/暂停读取SIMPLE_QUEUE。当PRIORITY_QUEUE收到消息时,我们必须暂停读取SIMPLE_QUEUE

预取属性 :我们通过一个读取消息

spring.rabbitmq.listener.simple.prefetch=1

监听器示例

@RabbitListener(queues = {priorityQueue , simpleQueue})
public void processMyQueue(String message) {
// if priorityQueue is not empty we shouldn't consume messages from simpleQueue
}

兔子配置

@Slf4j
@Configuration
@PropertySource({"...,..."})
@RequiredArgsConstructor
public class RabbitConfiguration {
@Value("${spring.rabbitmq.host}")
private String queueHost;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new 
CachingConnectionFactory(queueHost);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean(value = "simpleQueue")
public Queue simpleQueue() {
return new Queue("simpleQueue");
}
@Bean(name = "priorityQueue")
public Queue priorityQueue() {
return new Queue("priorityQueue");
}
}

您不能使用单个侦听器;使用两个侦听器,并在优先级侦听器接收到任何消息时停止简单侦听器容器;然后在优先级侦听器空闲时启动简单队列侦听器。

您可以使用ListenerContainerIdleEvent来检测空闲侦听器。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#consumer-事件

使用RabbitListenerEndpointRegistry停止/启动容器。

最新更新