RabbitMqJava-获取队列通道以检测是否存在未处理的消息



我们使用RabbitMq在一些服务之间进行通信。有时会有很多消息同时排队。我们希望能够看到仍有未处理的消息,即处理消息的服务是否繁忙。

我一直在寻找一种程序化的方法来检查队列是否有消息,并找到了这个

channel.queueDeclarePassive(queueName).getMessageCount()

问题是:我没有频道对象。我们的RabbitMq设置是几年前创建的,通常看起来是这样的:

@Configuration
@EnableRabbit
public class RabbitMqConfig {
public static final String RENDER_HTML_QUEUE = "render.html";
private String rabbitUri;
private int connectionTimeout;
private String exchangeName;
private int concurrentConsumers;
public RabbitMqConfig(
@Value("${rabbitmq.uri}") String rabbitUri,
@Value("${rabbitmq.exchange.name}") String exchangeName,
@Value("${rabbitmq.connection.timeout}") int timeout,
@Value("${rabbitmq.concurrent-consumers:1}") int concurrentConsumers) {
this.exchangeName = exchangeName;
this.rabbitUri = rabbitUri;
this.connectionTimeout = timeout;
this.concurrentConsumers = concurrentConsumers;
}
@Bean
DirectExchange directExchangeBean() {
return new DirectExchange(this.exchangeName, true, false);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RENDER_HTML_QUEUE);
container.setConcurrentConsumers(concurrentConsumers);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(RenderItemMessageConsumer receiver) {
return new MessageListenerAdapter(receiver, "reciveMessageFromRenderQueue");
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory;
try {
connectionFactory = new CachingConnectionFactory(new URI(this.rabbitUri));
connectionFactory.setConnectionTimeout(this.connectionTimeout);
} catch (URISyntaxException e) {
throw new ApiException(e, BaseErrorCode.UNKOWN_ERROR, e.getMessage());
}
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue renderRenderQueue() {
return new Queue(RENDER_HTML_QUEUE, true);
}
@Bean
Binding rendererRenderBinding() {
return BindingBuilder.bind(renderRenderQueue()).to(directExchangeBean()).with(
RENDER_HTML_QUEUE);
}
}

然后消息被消耗如下:

@Component
public class RenderItemMessageConsumer {
@RabbitListener(queues = RENDER_HTML_QUEUE)
public void reciveMessageFromRenderQueue(String message) {
//...
}

exchangeName是跨服务共享的。因此,通常我需要一种方法来获取可能为队列和连接创建的通道,以查看里面有多少消息。理想情况下,我希望在生成呈现服务中使用的消息的其他服务处访问该信息。

还是我做错了什么?我必须显式地创建一个通道并将队列连接到它吗?我甚至不确定幕后创建了什么频道,正如我所提到的,我几年前就建立了这个频道,在一切正常后没有深入挖掘。

我可以以某种方式使用amqpAdmin获取所有频道吗?

经过一番尝试,我可以回答自己的问题:

@Autowired
private ConnectionFactory connectionFactory;
public Boolean isBusy(String queue) throws IOException {
Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(false);
return channel.queueDeclarePassive(queue).getMessageCount() > 0;
}

由于我的所有服务都有类似的设置,将connectionFactory公开为bean,并且它们都使用相同的交换名称连接到共享的rabbitMq服务器,所以我可以使用任何服务来完成上述操作。我可以把这个片段放在一个rest资源后面,因此从我的管理UI可以请求关于我知道名称的队列的所有信息,以决定是否要向它发布另一批消息。

相关内容

最新更新