我们正在尝试使用spring jms为ActiveMQ设置一个重新交付策略。我们已经为重新交付设置了指数回退,但它似乎被忽略了——消息重新交付之间的间隔是固定的,而不是指数增长。
有谁知道可能是什么问题吗?这是我们的spring-jms配置:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="${activemq_url}">
<property name="redeliveryPolicy" ref="redeliveryPolicy" />
</bean>
<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="queue" value="*" />
<property name="initialRedeliveryDelay" value="10000" />
<property name="redeliveryDelay" value="10000" />
<property name="maximumRedeliveries" value="-1" />
<property name="useExponentialBackOff" value="true" />
<property name="backOffMultiplier" value="5" />
</bean>
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
p:targetConnectionFactory-ref="connectionFactory" p:sessionCacheSize="10"
/>
<!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="messageConverter" ref="messageConverter" />
<property name="sessionTransacted" value="true" />
</bean>
<!-- The Spring message listener container configuration -->
<jms:listener-container container-type="default"
destination-type="queue" connection-factory="connectionFactory"
acknowledge="transacted" concurrency="1" cache="consumer">
<jms:listener destination="testQueue" ref="testService"
method="onMessage" />
</jms:listener-container>
谢谢!
编辑:这是一个日志示例,每5秒重新交付一次:
11 May 2014 18:52:00 WARN DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: Sun May 11 18:52:00 IDT 2014
at ...
11 May 2014 18:52:05 WARN DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: Sun May 11 18:52:05 IDT 2014
at ...
11 May 2014 18:52:10 WARN DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: Sun May 11 18:52:10 IDT 2014
at ...
11 May 2014 18:52:15 WARN DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: Sun May 11 18:52:15 IDT 2014
at ...
11 May 2014 18:52:20 WARN DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: Sun May 11 18:52:20 IDT 2014
at ...
所以,我想我找到了问题:当我之前测试策略时,我抛出了JMSException以获得要重新交付的消息。
我将抛出的异常更改为exception/RuntimeException,指数回退工作。
我不确定为什么JMSException导致指数回退策略被忽略…有人有什么想法吗?
为了让@tyfyh的评论更突出一点,让我们将其展开为一个答案。通过将缓存级别设置为CACHE_CONSUMER,这意味着将缓存JMS消费者(消息接收者),从而允许将其用于多个消息交付。这种缓存行为可能会影响重新交付行为和异常处理,并修复上述问题。
你可以自己定义DefaultJmsListenerContainerFactory:
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(PlatformTransactionManager txManager, ConnectionFactory connectionFactory) {
var factory = new DefaultJmsListenerContainerFactory();
factory.setTransactionManager(txManager);
factory.setConnectionFactory(connectionFactory);
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
return factory;
}
或者使用现有bean,只设置缓存级别:
@Configuration
public class JmsListenerConfig {
private final DefaultJmsListenerContainerFactory factory;
public JmsListenerConfig(DefaultJmsListenerContainerFactory factory) {
this.factory = factory;
}
@PostConstruct
public void customizeJmsListenerContainerFactory() {
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
}
}