在启用对多个vhost的动态支持后,监听默认虚拟主机(/)的侦听器未使用的消息



下面是我的配置:

我在我的应用程序中使用Springboot RabbitMQ

我需要在一个队列上支持侦听vhost。启用此应用程序后,停止工作以使用默认vhost中的消息。

public class RabbitMqConfig implements RabbitListenerConfigurer {
@Bean
public RabbitMessagingTemplate rabbitMessagingTemplate(
RabbitTemplate rabbitTemplate,
ConnectionFactory connectionFactory) {
RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();
SimpleRoutingConnectionFactory simpleRoutingConnectionFactory = new SimpleRoutingConnectionFactory();
simpleRoutingConnectionFactory.setDefaultTargetConnectionFactory(connectionFactory);
rabbitTemplate.setConnectionFactory(simpleRoutingConnectionFactory);
rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate);
return rabbitMessagingTemplate;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
clients
.stream()
.forEach(client -> {
var endpoint = createEndpoint(client.getVhost());
var listenerContainerFactory = simpleRabbitListenerContainerFactory(client.getVhost());
listenerContainerFactory.setConnectionFactory(
annotationConfigWebApplicationContext.getBean((client.getVhost()),
ConnectionFactory.class));
registrar.registerEndpoint(endpoint, listenerContainerFactory);
});
}
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(String vHost) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setUsername(rabbitProperties.getUsername());
cf.setPassword(rabbitProperties.getPassword());
cf.setVirtualHost(vHost);
cf.setHost(rabbitProperties.getHost());
cf.setBeanName(vHost);
simpleRabbitListenerContainerFactory.setConnectionFactory(cf);
annotationConfigWebApplicationContext.getBeanFactory().registerSingleton(vHost, cf);
simpleRabbitListenerContainerFactory.setContainerCustomizer(simpleMessageListenerContainer -> {
simpleMessageListenerContainer.setQueueNames(graphqlMutationQueue);
simpleMessageListenerContainer.setLookupKeyQualifier(graphqlMutationQueue);
simpleMessageListenerContainer.setMessageListener(msg -> {
log.info(msg + " from VH: " + msg.getMessageProperties().getHeader(REPLY_TO_VHOST));
updateGraphqlListener.onMessage(msg);
});
simpleMessageListenerContainer.setAfterReceivePostProcessors(msg -> {
msg.getMessageProperties().setHeader(REPLY_TO_VHOST, vHost);
return msg;
});
simpleMessageListenerContainer.start();
});
return simpleRabbitListenerContainerFactory;
}
private SimpleRabbitListenerEndpoint createEndpoint(String vHost) {
var endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setId(vHost);
endpoint.setQueueNames(graphqlMutationQueue);
endpoint.setMessageConverter(jsonMessageConverter());
endpoint.setMessageListener(new MessageListenerAdapter("OnMessage"));
return endpoint;
}
}

在应用程序启动时,我们正在发送消息给exchange:rabbitTemplate。convertAndSend (graphQLExchangeName stringutil的。空,Collections.EMPTY_MAP);

在这行之后,默认虚拟主机中的监听器不会使用执行消息。

编辑2:

@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory defaultConnectionFactory) {
return new RabbitTemplate(defaultConnectionFactory);
}
@Bean
RabbitAdmin rabbitAdminAdmin(ConnectionFactory defaultConnectionFactory) {
return new RabbitAdmin(defaultConnectionFactory);
}
@Bean
SimpleRabbitListenerContainerFactory defaultContFactory(ConnectionFactory defaultConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(defaultConnectionFactory);
return factory;
}



@RabbitListener(queues = "#{someQueue}", containerFactory = "defaultContFactory")
public void onMessage(Map<?, ?> map) {

}


@SpringBootApplication(exclude = RabbitAutoConfiguration.class)

Boot在检测到用户定义的基础架构bean时禁用自动配置。

您还需要手动配置默认vHost的连接工厂等。