在activemq . advisory . expired . queue中迭代非持久性activemq过期消息



我在activemq上构建一个应用程序,我从生产者那里发送消息,我的交付模式是NON_PERSISTENT (我不处理持久性交付模式,我知道它将存储在dlq中,这不是我的设计),并使用producer. settimeolive(2000)设置了消息的生存时间。功能提示消息将在2秒后过期。

我看到过期的消息在ActiveMQ.Advisory.Expired中排队。在activeMQ管理控制台的主题部分中排队,即http://localhost:8161/admin/topics.jsp.

我的问题是我如何迭代ActiveMQ.Advisory.Expired。排队,以便我可以访问过期消息的MessageID。任何代码示例都可以。

订阅目的地ActiveMQ.Advisory.Expired。Queue就像任何主题一样,它返回ActiveMQMessage。有数据结构对象(ConsumerInfo, ProducerInfo,ConnectionInfo…)可以通过ActiveMQMessage的getDataStructure方法检索。

doc http://activemq.apache.org/advisory-message.html

的例子:

Destination advisoryDestination = AdvisorySupport.getExpiredQueueMessageAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);
public void onMessage(Message msg){
    String messageId =   msg.getJMSMessageID();
    String orignalMessageId =   msg.getStringProperty(org.apache.activemq.advisory.AdvisorySupport.MSG_PROPERTY_MESSAGE_ID);
    if (msg instanceof ActiveMQMessage){
        try {
             ActiveMQMessage aMsg =  (ActiveMQMessage)msg;
             ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
        } catch (JMSException e) {
            log.error("Failed to process message: " + msg);
        }
    }
}

最新更新