jakarta ee - JMS 消息接收器通过 JMSCorrelationID 过滤



如何在java(JRE/JDK/J2EE 1.4)中实例化仅接收与给定JMSCorrelationID匹配的消息的JMS队列侦听器? 我要接收的消息已发布到队列而不是主题,尽管如果需要,可以更改。

以下是我当前用于将消息放入队列的代码:
/**
 * publishResponseToQueue publishes Requests to the Queue.
 *
 * @param   jmsQueueFactory             -Name of the queue-connection-factory
 * @param   jmsQueue                    -The queue name for the request
 * @param   response                     -A response object that needs to be published
 * 
 * @throws  ServiceLocatorException     -An exception if a request message
 *                                      could not be published to the Topic
 */
private void publishResponseToQueue( String jmsQueueFactory,
                                    String jmsQueue,
                                    Response response )
        throws ServiceLocatorException {
    if ( logger.isInfoEnabled() ) {
        logger.info( "Begin publishRequestToQueue: " +
                         jmsQueueFactory + "," + jmsQueue + "," + response );
    }
    logger.assertLog( jmsQueue != null && !jmsQueue.equals(""),
                      "jmsQueue cannot be null" );
    logger.assertLog( jmsQueueFactory != null && !jmsQueueFactory.equals(""),
                      "jmsQueueFactory cannot be null" );
    logger.assertLog( response != null, "Request cannot be null" );
    try {
        Queue queue = (Queue)_context.lookup( jmsQueue );
        QueueConnectionFactory factory = (QueueConnectionFactory)
            _context.lookup( jmsQueueFactory );
        QueueConnection connection = factory.createQueueConnection();
        connection.start();
        QueueSession session = connection.createQueueSession( false,
                                    QueueSession.AUTO_ACKNOWLEDGE );
        ObjectMessage objectMessage = session.createObjectMessage();
        objectMessage.setJMSCorrelationID(response.getID());
        objectMessage.setObject( response );
        session.createSender( queue ).send( objectMessage );
        session.close();
        connection.close();
    } catch ( Exception e ) {
        //XC3.2  Added/Modified BEGIN
        logger.error( "ServiceLocator.publishResponseToQueue - Could not publish the " +
                      "Response to the Queue - " + e.getMessage() );
        throw new ServiceLocatorException( "ServiceLocator.publishResponseToQueue " +
                                           "- Could not publish the " +
                      "Response to the Queue - " + e.getMessage() );
        //XC3.2  Added/Modified END
    }
    if ( logger.isInfoEnabled() ) {
        logger.info( "End publishResponseToQueue: " +
                         jmsQueueFactory + "," + jmsQueue + response );
    }
}  // end of publishResponseToQueue method 

队列连接设置是相同的,但是一旦您拥有了 QueueSession,就可以在创建接收器时设置选择器。

    QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'");

然后

receiver.receive()

receiver.setListener(myListener);
顺便说一句,

虽然这不是您提出的实际问题 - 如果您尝试通过 JMS 实现请求响应,我建议您阅读本文,因为 JMS API 比您想象的要复杂得多,并且有效地执行此操作比看起来要困难得多。

特别是为了有效地使用 JMS,您应该尽量避免为单个消息等创建消费者。

另外,由于JMS API非常复杂,无法正确有效地使用,特别是在池化,事务和并发处理方面,我建议人们从他们的应用程序代码中隐藏中间件,例如通过使用Apache Camel的JMS的Spring Remoting实现。

希望这会有所帮助。我使用了 Open MQ。

package com.MQueues;
import java.util.UUID;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.BasicQueue;
import com.sun.messaging.QueueConnectionFactory;
public class HelloProducerConsumer {
public static String queueName = "queue0";
public static String correlationId;
public static String getCorrelationId() {
    return correlationId;
}
public static void setCorrelationId(String correlationId) {
    HelloProducerConsumer.correlationId = correlationId;
}
public static String getQueueName() {
    return queueName;
}
public static void sendMessage(String threadName) {
    correlationId = UUID.randomUUID().toString();
    try {
        // Start connection
        QueueConnectionFactory cf = new QueueConnectionFactory();
        QueueConnection connection = cf.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        BasicQueue destination = (BasicQueue) session.createQueue(threadName);
        MessageProducer producer = session.createProducer(destination);
        connection.start();
        // create message to send
        TextMessage message = session.createTextMessage();
        message.setJMSCorrelationID(correlationId);
        message.setText(threadName + "(" + System.currentTimeMillis() 
                + ") " + correlationId +" from Producer");
        System.out.println(correlationId +" Send from Producer");
        producer.send(message);
        // close everything
        producer.close();
        session.close();
        connection.close();
    } catch (JMSException ex) {
        System.out.println("Error = " + ex.getMessage());
    }
}
public static void receivemessage(final String correlationId) {
    try {
        QueueConnectionFactory cf = new QueueConnectionFactory();
        QueueConnection connection = cf.createQueueConnection();
        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        BasicQueue destination = (BasicQueue) session.createQueue(getQueueName());
        connection.start();
        System.out.println("n");
        System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
        long now = System.currentTimeMillis();
        // receive our message
        String filter = "JMSCorrelationID = '" + correlationId  + "'";
        QueueReceiver receiver = session.createReceiver(destination, filter);
        TextMessage m = (TextMessage) receiver.receive();
        System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp());
        System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
        session.close();
        connection.close();
    } catch (JMSException ex) {
        System.out.println("Error = " + ex.getMessage());
    }
}
public static void main(String args[]) {
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId1 = getCorrelationId();
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId2 = getCorrelationId();
    HelloProducerConsumer.sendMessage(getQueueName());
    String correlationId3 = getCorrelationId();

    HelloProducerConsumer.receivemessage(correlationId2);
    HelloProducerConsumer.receivemessage(correlationId1);
    HelloProducerConsumer.receivemessage(correlationId3);
}
}
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
QueueReceiver receiver = session.createReceiver(queue, filter);

在这里,接收方将获得JMSCorrelationID等于 MessageID 的消息。 这在请求/响应范式中非常有帮助。

或者您可以直接将其设置为任何值:

QueueReceiver receiver = session.createReceiver(queue,  "JMSCorrelationID ='"+id+"'";);

你可以做receiver.receive(2000);receiver.setMessageListener(this);

相关内容

  • 没有找到相关文章

最新更新