Spring JMS——忽略指数级消息重新交付策略



我们正在尝试使用spring jms为ActiveMQ设置一个重新交付策略。我们已经为重新交付设置了指数回退,但它似乎被忽略了——消息重新交付之间的间隔是固定的,而不是指数增长。

有谁知道可能是什么问题吗?这是我们的spring-jms配置:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
    p:brokerURL="${activemq_url}">
    <property name="redeliveryPolicy" ref="redeliveryPolicy" />
</bean>
<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="queue" value="*" />
    <property name="initialRedeliveryDelay" value="10000" />
    <property name="redeliveryDelay" value="10000" />
    <property name="maximumRedeliveries" value="-1" />
    <property name="useExponentialBackOff" value="true" />
    <property name="backOffMultiplier" value="5" />
</bean>
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" 
    p:targetConnectionFactory-ref="connectionFactory" p:sessionCacheSize="10" 
    />
<!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="messageConverter" ref="messageConverter" />
    <property name="sessionTransacted" value="true" />
</bean>
<!-- The Spring message listener container configuration -->
<jms:listener-container container-type="default"
    destination-type="queue" connection-factory="connectionFactory"
    acknowledge="transacted" concurrency="1" cache="consumer">
    <jms:listener destination="testQueue" ref="testService"
        method="onMessage" />
</jms:listener-container>

谢谢!

编辑:这是一个日志示例,每5秒重新交付一次:

11 May 2014 18:52:00  WARN DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: Sun May 11 18:52:00 IDT 2014
at ...
11 May 2014 18:52:05  WARN DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: Sun May 11 18:52:05 IDT 2014
at ...
11 May 2014 18:52:10  WARN DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: Sun May 11 18:52:10 IDT 2014
at ...
11 May 2014 18:52:15  WARN DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: Sun May 11 18:52:15 IDT 2014
at ...
11 May 2014 18:52:20  WARN DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: Sun May 11 18:52:20 IDT 2014
at ...

所以,我想我找到了问题:当我之前测试策略时,我抛出了JMSException以获得要重新交付的消息。

我将抛出的异常更改为exception/RuntimeException,指数回退工作。

我不确定为什么JMSException导致指数回退策略被忽略…有人有什么想法吗?

为了让@tyfyh的评论更突出一点,让我们将其展开为一个答案。通过将缓存级别设置为CACHE_CONSUMER,这意味着将缓存JMS消费者(消息接收者),从而允许将其用于多个消息交付。这种缓存行为可能会影响重新交付行为和异常处理,并修复上述问题。

你可以自己定义DefaultJmsListenerContainerFactory:

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(PlatformTransactionManager txManager, ConnectionFactory connectionFactory) {
  var factory = new DefaultJmsListenerContainerFactory();
  factory.setTransactionManager(txManager);
  factory.setConnectionFactory(connectionFactory);
  factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
  return factory;
}

或者使用现有bean,只设置缓存级别:

@Configuration
public class JmsListenerConfig {
  private final DefaultJmsListenerContainerFactory factory;
  public JmsListenerConfig(DefaultJmsListenerContainerFactory factory) {
    this.factory = factory;
  }
  @PostConstruct
  public void customizeJmsListenerContainerFactory() {
    factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
  }
}

最新更新