如何使用 Spring 的 DefaultMessageListenerContainer 定位 ActiveMQ Artemis 集群的所有节点



通过Spring的DefaultJmsListenerContainerFactory连接到ActiveMQ Artemis集群(实际上是Red Hat的AMQ(时遇到问题。

DefaultMessageListenerContainer只使用一个连接,而不考虑通过并发参数指定的使用者数量。问题是,在集群中,目前配置了3个代理(作为开发人员,我不应该关心集群的拓扑结构(。由于这里只有一个连接,所以消费者只监听一个代理。

为了解决这个问题,我禁用了缓存(即工厂中的setCacheLevel(CACHE_NONE)(。它";已解决";这个问题是因为现在我可以看到连接分布在集群的所有节点上,但这不是一个好的解决方案,因为连接总是被丢弃和重新创建,这在代理端造成了很大的开销(这让我想到了圣诞树:D(。

你们能告诉我处理这个问题的正确方法是什么吗?我尝试使用JmsPoolConnectionFactory,但到目前为止没有得到任何好的结果。我仍然只有一个联系。

我使用的是Spring Boot 2.7.4和Artemis Starter。您可以在下面找到实际配置的代码片段。

(附带说明,我不使用Spring自动配置,因为我需要能够在ActiveMQ Artemis和旧的ActiveMQ"Classic"实现之间切换(。

@Bean
DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDestinationResolver(destinationResolver());
factory.setSessionTransacted(true);
factory.setConcurrency(config.getConcurrency());
//Set this to allow load balancing of connections to all members of the cluster
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
final ExponentialBackOff backOff = new ExponentialBackOff(
config.getRetry().getInitialInterval(), config.getRetry().getMultiplier());
backOff.setMaxInterval(config.getRetry().getMaxDuration());
factory.setBackOff(backOff);
return factory;
}
ConnectionFactory connectionFactory() {
return new ActiveMQJMSConnectionFactory(
config.getUrl(), config.getUser(), config.getPassword());
}
DestinationResolver destinationResolver() {
final ActiveMQQueue activeMQQueue = new ActiveMQQueue(config.getQueue());
return (session, destinationName, pubSubDomain) -> activeMQQueue;
}

@JmsListener(destination = "${slp.amq.queue}")
public void processLog(String log) {
final SecurityLog securityLog = SecurityLog.parse(log);
fileWriter.write(securityLog);
logsCountByApplicationId.increment(securityLog.getApplicationId());
if (elasticClient != null) {
elasticClient.write(securityLog);
}
}

连接URL为:

(tcp://broker1:port,tcp://broker2:port,tcp://broker3:port)?useTopologyForLoadBalancing=true

可以配置集群,以便任何节点上的任何使用者都可以使用发送到任何节点的消息。因此,你不应该严格要求;以所有节点为目标";与您的消费者共享集群的。群集中的消息重新分发和重新路由对您的应用程序应该是透明的。正如您所说,作为一名开发人员,您不应该关心集群的拓扑结构。

也就是说,集群的目标是通过水平扩展来提高整体消息吞吐量(即性能(。此外,理想情况下,集群中的每个节点都应该有足够的生产者和消费者,这样消息就不会在集群节点之间重新分发或路由,因为这对性能来说不是最佳的。如果您的情况是只有少数消费者连接到集群,那么您很可能根本不需要集群。在某些用例中,单个ActiveMQ Artemis代理每秒可以处理数百万条消息。

相关内容

最新更新