如何在建立JMS连接时捕获



我有消息生产者正在使用ActiveMQ发送关于某些事件的JMS消息。但是,与ActiveMQ的连接可能不是一直都是正常的。因此,事件被存储,当连接建立时,它们应该被读取和发送。下面是我的代码:

private void sendAndSave(MyEvent event) {
    boolean sent = sendMessage(event);
    event.setProcessed(sent);
    boolean saved = repository.saveEvent(event);
    if (!sent && !saved) {
        logger.error("Change event lost for Id = {}", event.getId());
    }
}
private boolean sendMessage(MyEvent event) {
    try {
        messenger.publishEvent(event);
        return true;
    } catch (JmsException ex) {
        return false;
    }
}

我想创建某种类型的ApplicationEventListener,当连接建立并处理未发送的事件时将被调用。我通读了JMS、Spring框架和ActiveMQ文档,但找不到任何线索如何将我的监听器与ConnectionFactory连接起来。

如果有人能帮我,我会非常感激的。

下面是我的应用Spring上下文对JMS的描述:

<!-- Connection factory to the ActiveMQ broker instance.              -->
<!-- The URI and credentials must match the values in activemq.xml    -->
<!-- These credentials are shared by ALL producers.                   -->
<bean id="jmsTransportListener" class="com.rhd.ams.service.common.JmsTransportListener" 
      init-method="init" destroy-method="cleanup"/>
<bean id="amqJmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="${jms.publisher.broker.url}"/>
    <property name="userName" value="${jms.publisher.username}"/>
    <property name="password" value="${jms.publisher.password}"/>
    <property name="transportListener" ref="jmsTransportListener"/>
</bean>
<!-- JmsTemplate, by default, will create a new connection, session, producer for         -->
<!-- each message sent, then close them all down again. This is very inefficient!         -->
<!-- PooledConnectionFactory will pool the JMS resources. It can't be used with consumers.-->
<bean id="pooledAmqJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory" ref="amqJmsConnectionFactory" />
</bean>
<!-- Although JmsTemplate instance is unique for each message, it is  -->
<!-- thread-safe and therefore can be injected into referenced obj's. -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="pooledAmqJmsConnectionFactory"/>
</bean>

您描述这个问题的方式,听起来确实像是JMS持久订阅的一个明确的案例。在走这条路之前,您可能需要考虑更传统的实现。除了注意事项外,ActiveMQ还提供了可以侦听的咨询消息,这些消息将在包括新连接在内的各种事件中发送。

=========

拍摄,对不起……我不明白问题是什么。我不认为咨询是解决方案....毕竟,您需要连接到代理才能获得它们,但是连接是您所知道的。

所以如果我理解正确的话(准备重试#2....),你需要的是一个客户端连接,当它失败时,尝试无限期地重新连接。当它重新连接时,您希望触发一个(或多个)事件,将挂起的消息刷新到代理。

所以检测丢失的连接很容易。您只需注册一个JMS ExceptionListener。就检测重新连接而言,我能想到的最简单的方法是启动一个重新连接线程。当它连接时,停止重新连接线程,并使用Observer/Observable或JMX通知或类似的方式通知感兴趣的各方。您可以使用ActiveMQ故障转移传输,它将为您执行连接重试循环,即使您只有一个代理。至少,它应该这样做,但它并没有为您做那么多,而不是由您自己的重新连接线程完成…但是如果你愿意委托一些控制给它,它会缓存你未刷新的消息(参见trackMessages选项),然后在它重新连接时发送它们,这是你想要做的所有事情。

我想,如果你的代理宕机了几分钟,这是一个不错的选择,但是如果你需要几个小时,或者你可能在宕机期间积累了1万多条消息,我只是不知道缓存机制是否像你所需要的那样可靠。

==================

手机应用…正确的。并不真正适合于故障转移传输。然后我将实现一个定时连接的计时器(使用http传输可能是一个好主意,但不相关)。当它连接上时,如果没有什么要冲洗的,那么x分钟后见。如果有,发送每条消息,等待握手,并从您的移动存储中清除消息。x分钟后再见。

我猜这是Android ?如果没有,请停止阅读。实际上我们在一段时间前实现了这个。我只做了服务器端,但如果我没记错的话,连接计时器/轮询器每n分钟旋转一次(我认为是可变频率,因为过于激进会耗尽电池)。一旦成功建立了连接,我相信他们会使用意图广播来推动消息推送者去做他们的事情。我们的想法是,即使只有一个消息推送器,我们也可以添加更多。

最新更新