Apache骆驼路由中的Activemq并发失败



试图在同一时刻向camel-activemq路由发送多个请求,一个请求得到服务,另一个请求不得到服务并按原样发送。Jms消息在发送之前也设置了JMScorrelationId,如下所示

textMessage.setJMSCorrelationID(UUID.randomUUID().toString());

下面是我的activemq路由

from("activemq:queue:TEST_QUEUE?disableReplyTo=true")
                .setExchangePattern(ExchangePattern.InOut)
                .process(new Processor() {
                    public void process(Exchange e) throws Exception {
                        log.info("Request : "
                                + MessageHelper.extractBodyAsString(e.getIn()));
                        /*Processing Logic*/
                    }
                })
                .beanRef("testBean","postDetails")
                .inOnly("activemq:queue:TEST_QUEUE");

同时发送到上述路由的多个(针对2个请求进行测试)请求除一个请求外未得到服务。servicemix.log显示所有收到的请求。但只有一个得到了服务。

以下是作为web应用程序的一部分部署在jboss 6.1中的发送请求的代码。

public Message receive(String message, String queueName) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://localhost:61616");
        String userName = "smx";
        String password = "smx";
        Connection connection;
        Message response =null;
        try {
            connection = connectionFactory.createConnection(userName, password);
            connection.start();
            ((ActiveMQConnectionFactory) connectionFactory)
                    .setDispatchAsync(false);
            Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            Queue destination = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            TextMessage textMessage = session.createTextMessage(message);
            Queue tempQueue = session.createQueue(queueName);
            textMessage.setJMSReplyTo(tempQueue);
            producer.send(textMessage);
            MessageConsumer consumer = session.createConsumer(tempQueue);
            response = consumer.receive();
            response.acknowledge();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return response;
    }

是否缺少某些或其他参数??请提出建议。

如果JMS消息具有JMSReplyTo头,Camel将自动发回回复,因此您的路由应该只是

from("activemq:queue:TEST_QUEUE")
                .process(new Processor() {
                    public void process(Exchange e) throws Exception {
                        log.info("Request : "
                                + MessageHelper.extractBodyAsString(e.getIn()));
                        /*Processing Logic*/
                    }
                })
                .beanRef("testBean","postDetails");

在路由的末尾(例如,在调用testBean之后),消息主体的内容被用作回复消息,这些消息被发送回JMSReplyTo标头中定义的名为的队列。

最新更新