Spring 与 ActiveMQ 集成:当队列没有更多待处理消息时收到通知



我正在使用与ActiveMQ的spring集成。当我的队列为空时,我需要做一些处理(不再有待处理的消息(。我已经实现了这样的事情,

package my.com.spring.integration;
public class MyListenerContainer extends DefaultMessageListenerContainer {
        @Override protected void messageReceived(Object invoker, Session session) {
            // I mark lastMessageReceived time here 
            super.messageReceived(invoker, session);
        }
        @Override protected void noMessageReceived(Object invoker, Session session) {
            // I wait for 1 minute from last message messageReceived and after 
            // that I consider that queue has no more messages now and
            // I start my stuff
            super.noMessageReceived(invoker, session);
        }
    }

我的弹簧xml看起来像这样,

<bean id="my.jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://127.0.0.1:61616?jms.prefetchPolicy.queuePrefetch=250&jms.useAsyncSend=true" />
    <property name="optimizeAcknowledge" value="true" />
</bean>
<bean id="my.jms.cachedConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory"
    p:targetConnectionFactory-ref="my.jmsConnectionFactory"
    p:sessionCacheSize="10" p:reconnectOnException="true" />
 <bean id="myListenerContainer "
        class="my.com.spring.integration.MyListenerContainer ">
        <property name="connectionFactory"
            ref="my.jms.cachedConnectionFactory" />
        <property name="destination" ref="myQ" />
        <property name="concurrentConsumers" value="1" />
        <property name="maxConcurrentConsumers" value="1" />
    </bean>
    <bean id="myMessageListener"
        class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener" />
    <bean id="myJmsEndpoint"
        class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
        <constructor-arg ref="myListenerContainer" />
        <constructor-arg ref="myMessageListener" />
        <property name="outputChannel" ref="myConsumerChannel" />
    </bean>

到目前为止,一切正常。当队列为空时,我会收到通知,并且我能够做我想要的事情。

问题:除了队列为空之外,还有其他原因不再接收消息吗?就像在 Spring 代码执行中发生任何异常一样,会发生什么? 这是实现我的目的的正确方法吗?如果没有,我该如何改进?

仅当未收到消息时才调用noMessageReceived,而不是在引发异常时调用。

这意味着DMLC

不会收到消息,但请记住,只有当您只有一个DMLC时,您的目的才是正确的。

因为 AMQ 向每个消费者发送大量消息(在您的情况下jms.prefetchPolicy.queuePrefetch=250(,并且每个消费者都在处理发送的消息,如果它断开连接而不确认它们,消息仍然可以分派给另一个消费者。

看看 http://activemq.apache.org/what-is-the-prefetch-limit-for.html

如果要确保从代理处获得有关队列的信息,则可以使用 JMX 来获取已调度消息的大小或数量。

import java.util.HashMap;
import java.util.Map;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
public class JMXGetDestinationInfos {
    public static void main(String[] args) throws Exception {
        JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
        Map<String, String[]> env = new HashMap<>();
        String[] creds = { "admin", "admin" };
        env.put(JMXConnector.CREDENTIALS, creds);
        JMXConnector jmxc = JMXConnectorFactory.connect(url, env);
        MBeanServerConnection conn = jmxc.getMBeanServerConnection();
        ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
        BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class,
                true);
        for (ObjectName name : mbean.getQueues()) {
            if (("myQ".equals(name.getKeyProperty("destinationName")))) {
                QueueViewMBean queueMbean = MBeanServerInvocationHandler.newProxyInstance(conn, name,
                        QueueViewMBean.class, true);
                // ="Number of messages in the destination which are yet to be
                // consumed. Potentially dispatched but unacknowledged.")
                System.out.println(queueMbean.getQueueSize());
                // Returns the number of messages that have been delivered
                // (potentially not acknowledged) to consumers.
                System.out.println(queueMbean.getDispatchCount());
            }
        }
    }
}

最新更新