突然停止应用程序服务器时,使用Spring JMS和ActiveMQ丢失JMS消息



我有一个Spring JMS应用程序,该应用程序具有JMS侦听器,该应用程序连接到应用程序启动时的活动MQ队列。该JMS侦听器是一个应用程序的一部分,该应用程序传达消息,用内容丰富它,然后将其交付到同一ActiveMQ Broker上的主题中。

会话传输设置为true。我没有执行任何数据库交易,因此我在任何地方都没有@transactional设置。从我阅读的内容来看,Session TransActed属性会在JMS侦听器的接收方法周围设置本地交易,因此在交易完成之前,它不会从队列中删除消息。我已经使用局部ActiveMQ实例和本地Tomcat容器对此进行了测试,并按照预期的方式进行了测试。

但是,当我部署到我们的perf环境并重试相同的测试时,我注意到当前在关闭服务器关闭时在飞行中的消息在完成接收方法之前从队列中拉出。

我想知道的是,如果我应该寻找明显的东西吗?是否有某些JMS标题会导致这种行为发生?请让我知道是否还有我可以提供的信息。

我正在使用apache Activemq 5.8.0的弹簧4.1.2.Release,在tomcat 7运行Java 8的容器上。

更新 - 添加我的Java JMS配置。请注意,为了清楚起见,我将我在perf属性文件中的内容替换为相关区域。

    @Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setMaxMessagesPerTask(-1);
    factory.setConcurrency(1);
    factory.setSessionTransacted(Boolean.TRUE);
    return factory;
}
@Bean
public CachingConnectionFactory connectionFactory(){
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(1000);
    redeliveryPolicy.setRedeliveryDelay(1000);
    redeliveryPolicy.setMaximumRedeliveries(6);
    redeliveryPolicy.setUseExponentialBackOff(Boolean.TRUE);
    redeliveryPolicy.setBackOffMultiplier(5);
    ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("queue.username"), environment.getProperty("queue.password"), environment.getProperty("jms.broker.endpoint"));
    activeMQ.setRedeliveryPolicy(redeliveryPolicy);
    activeMQ.setPrefetchPolicy(prefetchPolicy());
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
    cachingConnectionFactory.setCacheConsumers(Boolean.FALSE);
    cachingConnectionFactory.setSessionCacheSize(1);
    return cachingConnectionFactory;
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
    ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("queue.out"));
    JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
    jmsMessagingTemplate.setDefaultDestination(activeMQ);
    return jmsMessagingTemplate;
}
protected ActiveMQPrefetchPolicy prefetchPolicy(){
    ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
    int prefetchValue = 0; 
    prefetchPolicy.setQueuePrefetch(prefetchValue);
    return prefetchPolicy;
}

谢谢,

胡安

事实证明,我们的应用程序中有不同的版本。应用程序更新后,然后按预期工作。

相关内容

  • 没有找到相关文章

最新更新