骆驼动态路由器导致无限循环



问题

  • 使用 dynamicRouter() 时遇到"无限"循环
  • 使用 producerTemplate.send(endpoint, exchange) 时看起来不错。但是,由于 CountDownLatch,它会导致方法 org.apache.camel.processor.UnitOfWorkProducer.process(Exchange) 中的阻塞。

我是否使用了不正确的生产者模板.send 方法?如何解决动态路由器()的"无限"循环问题?下面的配置有什么问题吗?

路由定义示例

private static final String inboundUri = "jms:request";
private static final String replyUri = "jms:reply";
private static final String subReply = "jms:subReply";
private static final String DEFAULT_ENDPOINT = "DEFAULT_ENDPOINT";

// Route Configuration
from(inboundUri).routeId("generic-jms-inbound").setExchangePattern(ExchangePattern.InOnly).threads().bean(dummyProcessor)
.setHeader(DEFAULT_ENDPOINT, constant("direct:main"))
.process(exchange -> {
    ProducerTemplate producerTemplate = exchange.getContext().createProducerTemplate();
    producerTemplate.send("direct:main", exchange);
})
//---------------------------
//.dynamicRouter().header("DEFAULT_ENDPOINT")  <== This causes "infinite" loop
//---------------------------
;
from("direct:main").setExchangePattern(ExchangePattern.InOnly).threads().bean(dummyProcessor).wireTap("direct:wireTap").to(replyUri);
from("direct:wireTap").setExchangePattern(ExchangePattern.InOnly).threads().bean(MessageHistoryLogger.class).to(subReply);

在实际情况下,异常日志如下所示

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[generic-jms-inboun] [generic-jms-inboun] [jms://terminalRequest                                                         ] [     94714]
[generic-jms-inboun] [setExchangePattern] [setExchangePattern[InOnly]                                                    ] [         0]
[generic-jms-inboun] [threads2          ] [threads                                                                       ] [     94714]
[generic-jms-inboun] [bean9             ] [bean[com.xxxxxxxxxxxx.integration.camel.dispatch.MessageUriResolver@bc0f53b]  ] [         0]
[generic-jms-inboun] [dynamicRouter1    ] [dynamicRouter[header{header(defaultEndPointUri)}]                             ] [     94714]
[sellTicket        ] [setExchangePattern] [setExchangePattern[InOnly]                                                    ] [         0]
[sellTicket        ] [threads4          ] [threads                                                                       ] [         5]
[sellTicket        ] [bean14            ] [bean[com.xxxxxxxxxxxx.integration.camel.processor.TicketProcessor@76c548f]    ] [         0]
[sellTicket        ] [bean17            ] [bean[com.xxxxxxxxxxxx.integration.camel.processor.PersistTicketProcessor@7d061] [         0]
[sellTicket        ] [wireTap1          ] [wireTap[direct:ceRequest]                                                     ] [         5]
[sellTicket        ] [bean18            ] [bean[com.xxxxxxxxxxxx.integration.camel.transformer.TerminalReplyTransformer@5] [         0]
[sellTicket        ] [bean19            ] [bean[com.xxxxxxxxxxxx.integration.camel.processor.LatencyMeasuringProcessor@62] [         0]
[sellTicket        ] [to2               ] [jms:terminalReply                                                             ] [         0]
[sellTicket        ] [setExchangePattern] [setExchangePattern[InOnly]                                                    ] [         0]
[sellTicket        ] [threads4          ] [threads                                                                       ] [         1]
[sellTicket        ] [bean14            ] [bean[com.xxxxxxxxxxxx.integration.camel.processor.TicketProcessor@76c548f]    ] [         0]
[sellTicket        ] [bean17            ] [bean[com.xxxxxxxxxxxx.integration.camel.processor.PersistTicketProcessor@7d061] [         0]
[sellTicket        ] [wireTap1          ] [wireTap[direct:ceRequest]                                                     ] [         0]
[sellTicket        ] [bean18            ] [bean[com.xxxxxxxxxxxx.integration.camel.transformer.TerminalReplyTransformer@5] [         0]
[sellTicket        ] [bean19            ] [bean[com.xxxxxxxxxxxx.integration.camel.processor.LatencyMeasuringProcessor@62] [         0]
[sellTicket        ] [to2               ] [jms:terminalReply                                                             ] [         1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> !!!!!!!!!!!!!!! KEEPS Repeating FROM (bean14 ==> to2)! !!!!!!!!!!!!! <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
[ceRequest         ] [setExchangePattern] [setExchangePattern[InOnly]                                                    ] [         0]
[ceRequest         ] [threads5          ] [threads                                                                       ] [     74291]
[ceRequest         ] [bean20            ] [bean[com.xxxxxxxxxxxx.integration.camel.processor.ExternalRequestRetriver@4303] [         0]
[ceRequest         ] [marshal1          ] [marshal[org.apache.camel.model.dataformat.JsonDataFormat@39e67516]            ] [         0]
[ceRequest         ] [to3               ] [jms:ceRequest                                                                 ] [     74161]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Error sending message - transport error (streaming response timeout)
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:496)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228)
    at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:431)
    at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:385)
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:120)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)
    at org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)
    at org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: javax.jms.JMSException: Error sending message - transport error (streaming response timeout)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
    at com.solacesystems.jms.impl.JMSExceptionValue.newInstance(JMSExceptionValue.java:32)
    at com.solacesystems.jms.impl.JCSMPExceptionMapper$ArrayListMapper.get(JCSMPExceptionMapper.java:31)
    at com.solacesystems.jms.impl.JCSMPExceptionMapper.get(JCSMPExceptionMapper.java:90)
    at com.solacesystems.jms.impl.Validator.createJMSException(Validator.java:541)
    at com.solacesystems.jms.SolMessageProducer.sendMessage(SolMessageProducer.java:343)
    at com.solacesystems.jms.SolMessageProducer.send(SolMessageProducer.java:155)
    at org.springframework.jms.connection.CachedMessageProducer.send(CachedMessageProducer.java:179)
    at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:635)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:336)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:275)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:217)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:231)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:493)
    ... 19 common frames omitted
Caused by: com.solacesystems.jcsmp.JCSMPTransportException: streaming response timeout
    at com.solacesystems.jms.impl.FlowMessageProducerAdapter.send(FlowMessageProducerAdapter.java:105)
    at com.solacesystems.jms.SolMessageProducer.sendMessage(SolMessageProducer.java:336)
    ... 27 common frames omitted

你读过文档吗?请参阅此页面顶部的框,标题为"当心"

  • http://camel.apache.org/dynamic-router

如果要将消息一次性发送到动态计算终结点,请参阅以下常见问题解答:

  • http://camel.apache.org/how-to-use-a-dynamic-uri-in-to.html

最新更新