ActiveMQ 队列的活动使用者过多



My Spring 应用程序使用 ActiveMQ 队列。有两种可能的方法。ActiveMQ集成的初始部分对于这两种方法是相同的:

@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory();
}
@Bean
public Queue notificationQueue() {
return resolveAvcQueueByJNDIName("java:comp/env/jms/name.not.important.queue");
}

单线程方法:

@Bean
public IntegrationFlow orderNotify() {
return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory()).destination(notificationQueue()),
c -> c.poller(Pollers.fixedDelay(QUEUE_POLLING_INTERVAL_MS)
.errorHandler(e -> logger.error("Can't handle incoming message", e))))
.handle(...).get();
}

但是我想使用多个工作线程使用消息,因此我将代码从入站适配器重构为消息驱动的通道适配器:

@Bean
public IntegrationFlow orderNotify() {
return IntegrationFlows.from(Jms.messageDriverChannelAdapter(connectionFactory()).configureListenerContainer(c -> {
final DefaultMessageListenerContainer container = c.get();
container.setMaxConcurrentConsumers(notifyThreadPoolSize);
}).destination(notificationQueue()))
.handle(...).get();
}

问题在于,当应用程序重新部署到Tomcat中或重新启动第二种方法时,该应用程序不会停止ActiveMQ的使用者。它在启动期间创造了新的消费者。但是所有新消息都会路由到旧的"死"消费者,因此它们位于"待处理消息"部分,永远不会被取消排队。

这里可能有什么问题?

我相信你必须完全阻止Tomcat。通常在应用程序的重新部署期间,Spring 容器应该停止并正确清除,但看起来情况并非如此:Tomcat 重新部署钩子缺少一些东西。因此,我建议完全停止它。

另一种选择是忘记外部Tomcat,而只是迁移到Spring Boot,因为它能够启动嵌入式servlet容器。这样,在重建和重新启动应用程序后不会有任何泄漏。

最新更新