rabbitMQ 创建两个单独的使用者,它们在同一服务中从两个不同的集群使用



我正在尝试创建两个单独的使用者,他们在同一服务中从两个不同的集群消费。

我尝试创建新连接,但仍然仅在其中一个集群中创建两个交换。

我在这里错过了一些东西? 我正在使用弹簧引导:2.2.5.发布和弹簧兔:2.2.5.发布

我的配置是这样的。

@Configuration
@AllArgsConstructor
public class RabbitMQConfiguration {
private final Connection_A_MQProperties connectionAMQProperties;
private final Connection_B_MQProperties connectionBMQProperties;
@Primary
@Bean
public ConnectionFactory connectionFactoryA(Connection_A_MQProperties connectionAMQProperties) {
return createConnection(connectionAMQProperties.getBaseProperties());
}
@Bean
public ConnectionFactory connectionFactoryB(Connection_B_MQProperties connectionBMQProperties) {
return createConnection(connectionBMQProperties.getBaseProperties());
}
private ConnectionFactory createConnection(BaseProperties baseProperties){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(baseProperties.getHost());
factory.setPort(baseProperties.getPort());
factory.setUsername(baseProperties.getUsername());
factory.setPassword(baseProperties.getPassword());
factory.setConnectionTimeout(baseProperties.getConnectionTimeout());
factory.setRequestedHeartBeat(baseProperties.getRequestedHeartBeat());
factory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
factory.setConnectionCacheSize(2);
return factory;
}
@Primary
@Bean("connection_A_RabbitAdmin")
public RabbitAdmin connection_A_RabbitAdmin(@Qualifier("connection_A_RabbitTemplate") RabbitTemplate connection_A_RabbitTemplate) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connection_A_RabbitTemplate);
rabbitAdmin.setExplicitDeclarationsOnly(true);
return rabbitAdmin;
}
@Bean("connection_B_RabbitAdmin")
public RabbitAdmin connection_B_RabbitAdmin(@Qualifier("connection_B_RabbitTemplate") RabbitTemplate connection_B_RabbitTemplate) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connection_B_RabbitTemplate);
rabbitAdmin.setExplicitDeclarationsOnly(true);
return rabbitAdmin;
}
/**
* each declarable has been configured with a AbstractDeclarable.setAdminsThatShouldDeclare() method which contains the particular admin bean 
* for which declarable should be processed. i have checked this is getting filtered out correctly for each of the rabbit admins.
**/
@Bean("connection_A_Declarable")
public Declarables connection_A_Declarable(@Qualifier("connection_A_RabbitAdmin") RabbitAdmin connection_A_RabbitAdmin) {
return DeclarableMQFactory.builder().rabbitAdminList(Collections.singletonList(connection_A_RabbitAdmin))
.baseProperties(connectionAMQProperties)
.queueNames(Collections.singletonList(connectionAMQProperties.getQueue()))
.build();
}
@Bean("connection_B_Declarable")
public Declarables connection_B_Declarable(@Qualifier("connection_B_RabbitAdmin") RabbitAdmin connection_B_RabbitAdmin){
return DeclarableMQFactory.builder().rabbitAdminList(Collections.singletonList(connection_B_RabbitAdmin))
.baseProperties(connectionBMQProperties)
.queueNames(Collections.singletonList(connectionBMQProperties.getQueue()))
.build();
}
@Primary
@Bean("connection_A_RabbitTemplate")
public RabbitTemplate connection_A_RabbitTemplate(@Qualifier("connectionFactoryA")ConnectionFactory connectionFactoryA) {
return rabbitTemplate(connectionFactoryA,connectionAMQProperties);
}
@Bean("connection_B_RabbitTemplate")
public RabbitTemplate connection_B_RabbitTemplate(@Qualifier("connectionFactoryB")ConnectionFactory connectionFactoryB) {
return rabbitTemplate(connectionFactoryB,connectionBMQProperties);
}
private RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,BaseProperties baseProperties) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
template.setExchange(baseProperties.getExchange());
template.setRoutingKey(baseProperties.getQueueName());
template.setDefaultReceiveQueue(baseProperties.getQueueName());
return template;
}
@Primary
@Bean(name = "connection_A_ContainerFactory")
public SimpleRabbitListenerContainerFactory connection_A_ContainerFactory(@Qualifier("connectionFactoryA")ConnectionFactory connectionFactoryA) {
return simpleRabbitListenerContainerFactory(connectionFactoryA,connectionAMQProperties);
}
@Bean(name = "connection_B_ContainerFactory")
public SimpleRabbitListenerContainerFactory connection_B_ContainerFactory(@Qualifier("connectionFactoryB")ConnectionFactory connectionFactoryB){
return simpleRabbitListenerContainerFactory(connectionFactoryB,connectionBMQProperties);
}
private SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory, BaseProperties baseProperties) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setConcurrentConsumers(baseProperties.getConcurrentConsumer());
factory.setMaxConcurrentConsumers(baseProperties.getMaxConcurrentConsumer());
return factory;
}
}

并且列表器配置为(类似于第二个侦听器(

@RabbitListener(
queues = "${connection_a.queue}",
containerFactory = "connectionFactoryA"
)

一些日志(编辑以隐藏一些信息((在这里您可以看到交易所和队列都由各自的管理员正确声明(。

o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: **-**-rabbitmq.qa2-sg.cld:5672
o.s.a.r.c.CachingConnectionFactory       : Created new connection: **ConnectionFactory#5cba890e:0/SimpleConnection@513bec8c [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64992]
o.s.retry.support.RetryTemplate          : Retry: count=0
o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://client-user@172.27.**.**:5672/,1)
o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1063/0x00000008007e3c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://client-user@172.27.**.**:5672/,1), conn: Proxy@1d6014a7 Shared Rabbit Connection: SimpleConnection@513bec8c [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64992]
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.connectionA.exchange'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.connectionA.exchange.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionA.**'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionA.**.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.** (QUEUE)] to exchange [com.**.**.**.**.exchange] with routing key [com.**.**.**.**.queue.**.**]
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.**.DLX (QUEUE)] to exchange [com.**.**.**.**.exchange.DLX] with routing key [com.**.**.**.**.queue.**.**.DLX]
o.s.amqp.rabbit.core.RabbitAdmin         : Declarations finished
o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: message-**-rabbitmq.qa2-sg.cld:5672
o.s.a.r.c.CachingConnectionFactory       : Created new connection: **ConnectionFactory#dbca149:0/SimpleConnection@59cb10e0 [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64994]
o.s.retry.support.RetryTemplate          : Retry: count=0
o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://client-user@172.27.**.**:5672/,1)
o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1063/0x00000008007e3c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://client-user@172.27.**.**:5672/,1), conn: Proxy@3d763ae5 Shared Rabbit Connection: SimpleConnection@59cb10e0 [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64994]
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.**.connectionB.exchange'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.**.connectionB.exchange.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionB.**'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionB.**.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.** (QUEUE)] to exchange [com.**.**.**.**.**.exchange] with routing key [com.**.**.**.**.queue.**.**]
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.**.DLX (QUEUE)] to exchange [com.**.**.**.**.**.exchange.DLX] with routing key [com.**.**.**.**.queue.**.**.DLX]
o.s.amqp.rabbit.core.RabbitAdmin         : Declarations finished

这对我来说是预期的:

@SpringBootApplication
public class So62382630Application {
public static void main(String[] args) {
SpringApplication.run(So62382630Application.class, args);
}
@Bean
@Primary
ConnectionFactory cf1() {
return new CachingConnectionFactory("localhost");
}
@Bean
ConnectionFactory cf2() {
return new CachingConnectionFactory("10.0.0.21");
}
@Bean
RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
Queue q1() {
Queue queue = new Queue("q1");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
Queue q2() {
Queue queue = new Queue("q2");
queue.setAdminsThatShouldDeclare(admin2());
return queue;
}
@Bean
public ApplicationRunner runner(RabbitAdmin admin1, RabbitAdmin admin2) {
return args -> {
System.out.println(admin1.getQueueInfo("q1"));
System.out.println(admin2.getQueueInfo("q2"));
};
}
}

我看到在相应节点上声明的队列。

最新更新