我正在使用spring-jms
编写一个JMS消费者来使用来自ActiveMQ Artemis代理的消息。我正在研究 3 个属性的行为:concurrentConsumers
、maxConcurrentConsumers
和idleConsumerLimit
。
预期行为是当队列中的负载增加时,使用者应扩展到maxConcurrentConsumers
,当负载减少时,使用者应缩小到idleConsumerLimit
。
但这并没有发生在spring-jms
和ActiveMQ Artemis身上。当队列中没有消息时,消费者计数不会下降,而是保留所有maxConcurrentConsumers
。
我正在使用弹簧DMLC容器
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="TestingInQueue" />
<property name="messageListener" ref="messageListener" />
<property name="concurrentConsumers" value="1"/>
<property name="maxConcurrentConsumers" value="20"/>
<property name="idleConsumerLimit" value="5"/>
</bean>
完整的上下文 XML:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
<prop key="java.naming.provider.url">tcp://localhost:61616</prop>
<prop key="java.naming.security.principal">admin</prop>
<prop key="java.naming.security.credentials">admin</prop>
</props>
</property>
</bean>
<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate" ref="jndiTemplate"/>
<property name="jndiName" value="ConnectionFactory"/>
</bean>
<bean id="destinationQueue" class="org.apache.activemq.artemis.jms.client.ActiveMQQueue">
<constructor-arg index="0" value="TestingInQueue" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="destinationQueue" />
</bean>
<bean id="messageListener" class="com.practise.SampleListener">
<property name="jmsTemplate" ref="jmsTemplate" />
<property name="queue" ref="destinationQueue" />
</bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="TestingInQueue" />
<property name="messageListener" ref="messageListener" />
<property name="concurrentConsumers" value="1"/>
<property name="maxConcurrentConsumers" value="20"/>
<property name="idleConsumerLimit" value="5"/>
</bean>
</beans>
注意:
maxConcurrentConsumers
: 负载下的最大消耗量concurrentConsumers
:如果你闲了一段时间,你最终会到达这里。(到达这里所需的时间取决于接下来的 3 个参数(
以下 3 个是为了避免消费者在maxConcurrentConsumers
和concurrentConsumers
之间大幅扩大和缩小,没有任何时间延迟。
idleConsumerLimit
maxMessagesPerTask
idleTaskExecutionLimit
例
如果从重负载开始,您将有 20(maxConcurrentConsumers
(个消费者。然后,如果你把队列定空,它将缩小到5(idleConsumerLimit
(。然后它将等待idleTaskExecutionLimit
尝试到达,然后缩小到 1 (concurrentConsumers
(。idleTaskExecutionLimit
是导致空抓取的抓取尝试次数,其中每次抓取需要receiveTimeout
(默认为 1000 毫秒(的时间单位。
问题:
默认值maxMessagesPerTask
为-1
,如果maxMessagesPerTask
不大于零idleConsumerLimit
则不会生效
溶液:
指定idleConsumerLimit
时,也指定maxMessagesPerTask
。
在此处指定 10 到 100 条消息的数量,以在相当长期和相当短暂的任务之间取得平衡,以便
maxMessagesPerTask
参考
- https://access.redhat.com/solutions/1412643
- 默认消息侦听器容器