使用 Spring AMQP Java 配置为每个队列配置专用侦听器容器



我在XML中配置了这样的侦听器

<rabbit:listener-container connection-factory="connectionFactory" concurrency="1" acknowledge="manual">
<rabbit:listener ref="messageListener" queue-names="${address.queue.s1}" exclusive="true"/>
<rabbit:listener ref="messageListener" queue-names="${address.queue.s2}" exclusive="true"/>
<rabbit:listener ref="messageListener" queue-names="${address.queue.s3}" exclusive="true"/>
<rabbit:listener ref="messageListener" queue-names="${address.queue.s4}" exclusive="true"/>
<rabbit:listener ref="messageListener" queue-names="${address.queue.s5}" exclusive="true"/>
<rabbit:listener ref="messageListener" queue-names="${address.queue.s6}" exclusive="true"/>
</rabbit:listener-container>

我正在尝试将其移动到Java配置,但我看不到将多个MessageListener添加到ListenerContainer的方法。在我的情况下,创建多个 ListenerContainer bean 不是一个选项,因为我不知道从运行时开始消耗的队列数量。队列名称将来自配置文件。

我做了以下工作

@PostConstruct
public void init() 
{
for (String queue : queues.split(","))
{
// The Consumers would not connect if I don't call the 'start()' method.
messageListenerContainer(queue).start();
}
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(String queue)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(queue);
container.setMessageListener(messageListener());
// Set Exclusive Consumer 'ON'
container.setExclusive(true);
// Should be restricted to '1' to maintain data consistency.
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}

它"排序"工作,但我看到一些奇怪的行为,打开了很多幽灵通道,这在 XML 配置中从未发生过。所以这让我怀疑我做错了什么。我想知道在 Java 配置中创建 MessageListenerContainers 的正确方法?简单地说,"Spring 如何将具有多个 'rabbit:listener'' 的 'rabbit:listener-container' 正确地转换为 java 对象?对此的任何帮助/见解将不胜感激。

商业案例我们有一个发布用户配置文件更新的发布者。发布者可以为同一用途调度多个更新,我们必须以正确的顺序处理它们,以维护数据存储中的数据完整性。

示例:用户:ABC,发布 -> {UsrA:Change1,...., UsrA:Change 2,....,UsrA:Change3} -> Consumer HAS 必须按该顺序处理 {UsrA:Change1,...., UsrA:Change 2,....,UsrA:Change 3}。

在之前的设置中,我们有 1 个队列获取所有用户更新,我们有一个并发 = 5 的使用者应用。有多个应用服务器运行使用者应用。这导致*5*">消费者应用的实例数"通道/线程*可以处理传入的消息。速度很棒!但是我们经常有无序处理,导致数据损坏。

为了保持严格的FIFO顺序并尽可能并行处理消息,我们实现了队列分片。我们有一个"x-consistent-hash,在employee-id上有一个hash-header。我们的发布者将消息发布到哈希交换,并且我们有多个分片队列绑定到哈希交换。这个想法是,我们将给定用户(例如用户 A)的所有更改排队在同一分片中。然后,我们让消费者以"独占"模式和"并发消费者 = 1"连接到分片队列并处理消息。这样,我们就可以确保以正确的顺序处理消息,同时仍然并行处理消息。我们可以通过增加分片的数量来使其更加并行。

现在进入消费者配置

我们已在多个应用服务器上部署了使用者应用。

原始方法:

正如您在上面看到的,我只是在我的消费者应用程序中向我的"rabbit:listener-container"添加了多个"rabbit:listener",除了首先启动的服务器在所有分片队列上获得独占锁之外,它运行良好,其他服务器只是坐在那里不工作。

新方法:

我们将分片队列名称移动到应用程序配置文件中。这样

Consumer Instance 1 : Properties
queues=user.queue.s1,user.queue.s2,user.queue.s3
Consumer Instance 2 : Properties
queues=user.queue.s4,user.queue.s5,user.queue.s6

同样值得注意的是,我们可以拥有任意数量的消费者实例,并且根据资源可用性,分片可能会在实例之间不均匀地分布。

将队列名称移动到配置文件后,XML confiugration将不再起作用,因为我们无法像以前那样将"rabbit:listener"动态添加到我的"rabbit:listener-container"中。

然后我们决定切换到 Java 配置。这就是我们被困的地方!

我们最初是这样做的

@Bean
public SimpleMessageListenerContainer messageListenerContainer()
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(queues.split(","));
container.setMessageListener(messageListener());
container.setMissingQueuesFatal(false);
// Set Exclusive Consumer 'ON'
container.setExclusive(true);
// Should be restricted to '1' to maintain data consistency.
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
return container;
}

它可以工作,但我们所有的队列都在一个连接上共享 1 个通道。这对速度不利。我们想要的是一个连接,每个队列都有自己的通道。

下一步

这里还没有成功!.我最初问题中的 java 配置是我们现在所处的位置。

我很困惑为什么这很难做到。 显然,XML配置做了一些在Java中不容易做到的事情(或者至少对我来说是这样)。我认为这是一个需要填补的空白,除非我被迫错过了什么。如果我错了,请纠正我。这是一个真正的商业案例,而不是一些虚构的边缘案例。如果您不这么认为,请随时发表评论。

它可以工作,但我们所有的队列都在一个连接上共享 1 个通道。这对速度不利。我们想要的是一个连接,每个队列都有自己的通道。

如果切换到DirectMessageListenerContainer,该配置中的每个队列都有自己的Channel

请参阅文档。

要回答您的原始问题(预编辑):

@Bean
public SimpleMessageListenerContainer messageListenerContainer1(@Value("${address.queue.s1}") String queue)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(queue);
container.setMessageListener(messageListener());
// Set Exclusive Consumer 'ON'
container.setExclusive(true);
// Should be restricted to '1' to maintain data consistency.
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
...
@Bean
public SimpleMessageListenerContainer messageListenerContainer6(@Value("${address.queue.s6}" ) String queue)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(queue);
container.setMessageListener(messageListener());
// Set Exclusive Consumer 'ON'
container.setExclusive(true);
// Should be restricted to '1' to maintain data consistency.
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}

下面是用于创建 SimpleMessageListenerContainer 的 Java 配置

@Value("#{'${queue.names}'.split(',')}")
private String[] queueNames;
@Bean
public SimpleMessageListenerContainer listenerContainer(final ConnectionFactory connectionFactory) {
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueNames);
container.setMessageListener(vehiclesReceiver());
setCommonQueueProperties(container);
return container;
}

每个<rabbit:listener >都使用相同的ConnectionFactory创建自己的SimpleListenerContainerbean。要在 Java 配置中执行类似的操作,您必须声明与队列一样多的SimpleListenerContainerbean:每个队列一个。

您也可以考虑改用@RabbitListener方法:https://docs.spring.io/spring-amqp/docs/2.0.4.RELEASE/reference/html/_reference.html#async-annotation-driven

最新更新