远程MessageListener的Camel JMS请求回复问题



我正在测试将JMS请求/回复与Camel和ActiveMQ一起使用的示例。当camel为您创建监听器时,我可以让这个例子发挥作用。即

from("direct:entryPoint").inOut("jms:queue:A");
from("jms:queue:A").
    process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody("Hello World.");
        }
    });



我现在遇到的问题是,我无法让JMS请求/回复与Camel jvm之外的MessageListener一起工作。等待答复时连接超时。我确保MessageListener正在向replyTo队列发送回复,并且我还在设置correlationId。我在这里做错了什么?我在谷歌上搜索了好几天,想弄清楚这件事,但运气不好。提前感谢你的帮助。

下面是我正在使用的路由,我还将MessageListener逻辑放在下面。

from("direct:entryPoint").
  inOut("jms:queue:B?concurrentConsumers=4&requestTimeout=240000");

队列B:的MessageListener onMessage

@Override
public void onMessage(Message message) {
    String msg = null;
    ObjectMapper mapper = new ObjectMapper();
    mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, true);
    String jsonOutput = null;
    try{
        msg = ((TextMessage) message).getText();
        //convert message payload to purchase order
        PurchaseOrder order = mapper.readValue(msg, PurchaseOrder.class);
        //Set the id to see if the request reply worked.
        order.setOrderId(BigInteger.valueOf(111111111));

        if(message.getJMSReplyTo() != null){
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("JMSCorrelationID", message.getJMSCorrelationID());
            headers.put("JMSReplyTo", message.getJMSReplyTo().toString());  
            jsonOutput = mapper.writeValueAsString(order);
            //Camel runs on the external jvm so leverage the producerTemplate.
            producerTemplate.
              sendBodyAndHeaders("jms:"+ message.getJMSReplyTo().toString(), 
              jsonOutput, headers);
        }
    }
    catch(Exception e){
        logger.fatal(e.getMessage());
        try {
            if(message.getJMSReplyTo() != null){
                Map<String, Object> headers = new HashMap<String, Object>();
                headers.put("JMSCorrelationID", message.getJMSCorrelationID());
                producerTemplate.
                 sendBodyAndHeaders("jms:"+ message.getJMSReplyTo().toString(),
 e.getMessage(), headers);
            }
        } catch (JMSException e1) {
            logger.fatal(e1.getMessage());
        } catch (Exception e1) {
            logger.fatal(e1.getMessage());
        }
    }
}
sendBodyAndHeaders("jms:"+ message.getJMSReplyTo().toString(), 
          jsonOutput, headers);

通常会解析为jms:queue://someQueue,这可能会把事情搞砸。使用转换为字符串的javax.jms.Destination通常不是一个好主意,除非您采取预处理。

您可以使用Camel标头CamelJmsDestination:headers.put("CamelJmsDestination",message.getJMSReplyTo());

这是否有效,我不知道。通常,尝试将ActiveMQ与Web控制台一起使用(或通过JMX(jconsole(连接到ActiveMQ(来查看队列,并尝试弄清楚谁在读取哪个队列以及消息的最终位置。这真的很有帮助。

最新更新