我正在使用与ActiveMQ的spring集成。当我的队列为空时,我需要做一些处理(不再有待处理的消息(。我已经实现了这样的事情,
package my.com.spring.integration;
public class MyListenerContainer extends DefaultMessageListenerContainer {
@Override protected void messageReceived(Object invoker, Session session) {
// I mark lastMessageReceived time here
super.messageReceived(invoker, session);
}
@Override protected void noMessageReceived(Object invoker, Session session) {
// I wait for 1 minute from last message messageReceived and after
// that I consider that queue has no more messages now and
// I start my stuff
super.noMessageReceived(invoker, session);
}
}
我的弹簧xml看起来像这样,
<bean id="my.jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616?jms.prefetchPolicy.queuePrefetch=250&jms.useAsyncSend=true" />
<property name="optimizeAcknowledge" value="true" />
</bean>
<bean id="my.jms.cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory"
p:targetConnectionFactory-ref="my.jmsConnectionFactory"
p:sessionCacheSize="10" p:reconnectOnException="true" />
<bean id="myListenerContainer "
class="my.com.spring.integration.MyListenerContainer ">
<property name="connectionFactory"
ref="my.jms.cachedConnectionFactory" />
<property name="destination" ref="myQ" />
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="1" />
</bean>
<bean id="myMessageListener"
class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener" />
<bean id="myJmsEndpoint"
class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
<constructor-arg ref="myListenerContainer" />
<constructor-arg ref="myMessageListener" />
<property name="outputChannel" ref="myConsumerChannel" />
</bean>
到目前为止,一切正常。当队列为空时,我会收到通知,并且我能够做我想要的事情。
问题:除了队列为空之外,还有其他原因不再接收消息吗?就像在 Spring 代码执行中发生任何异常一样,会发生什么? 这是实现我的目的的正确方法吗?如果没有,我该如何改进?
仅当未收到消息时才调用noMessageReceived
,而不是在引发异常时调用。
不会收到消息,但请记住,只有当您只有一个DMLC时,您的目的才是正确的。
因为 AMQ 向每个消费者发送大量消息(在您的情况下jms.prefetchPolicy.queuePrefetch=250
(,并且每个消费者都在处理发送的消息,如果它断开连接而不确认它们,消息仍然可以分派给另一个消费者。
看看 http://activemq.apache.org/what-is-the-prefetch-limit-for.html
如果要确保从代理处获得有关队列的信息,则可以使用 JMX 来获取已调度消息的大小或数量。
import java.util.HashMap;
import java.util.Map;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
public class JMXGetDestinationInfos {
public static void main(String[] args) throws Exception {
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
Map<String, String[]> env = new HashMap<>();
String[] creds = { "admin", "admin" };
env.put(JMXConnector.CREDENTIALS, creds);
JMXConnector jmxc = JMXConnectorFactory.connect(url, env);
MBeanServerConnection conn = jmxc.getMBeanServerConnection();
ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class,
true);
for (ObjectName name : mbean.getQueues()) {
if (("myQ".equals(name.getKeyProperty("destinationName")))) {
QueueViewMBean queueMbean = MBeanServerInvocationHandler.newProxyInstance(conn, name,
QueueViewMBean.class, true);
// ="Number of messages in the destination which are yet to be
// consumed. Potentially dispatched but unacknowledged.")
System.out.println(queueMbean.getQueueSize());
// Returns the number of messages that have been delivered
// (potentially not acknowledged) to consumers.
System.out.println(queueMbean.getDispatchCount());
}
}
}
}