Camel与ActiveMQ中的死信队列



我正在使用死信通道EIP,正如死信通道的Camel文档中所述。这是我的camel.xml(删除了标题)

<camelContext id="camel"  xmlns="http://camel.apache.org/schema/spring">
<!-- this is the Dead Letter Channel error handler, where we send failed message to a log endpoint -->


<route  errorHandlerRef="myDeadLetterErrorHandler">
<from uri="jms:q1" />
<doTry>
<validate><simple>${body[subsc]} regex '[a-z0-9]{8}' </simple> </validate>
<choice>
<when>
<simple>${body[key1]} regex '^(v1)$'</simple>
<choice>
<when>
<simple>${body[key2]} == 'v2'</simple>
<to uri="jms:getInfo" />
</when>
</choice>        
</when>
<otherwise>
<to uri="jms:invalid" />
<stop />
</otherwise>
</choice>
<doCatch>
<exception>org.apache.camel.ValidationException</exception>
<to uri="jms:invalid"/>
</doCatch>
</doTry>
</route>
<route>
<from uri="jms:queue:deadlc" />
<log message="Got ${body}" loggingLevel="ERROR" logName="cool" />
</route>
</camelContext>
<!--
Lets configure some Camel endpoints
http://camel.apache.org/components.html
-->
<!-- configure the camel activemq component to use the current broker -->
<bean id="confact" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://amq-broker?create=false"/>
<property name="userName" value="${activemq.username}"/>
<property name="password" value="${activemq.password}"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
<property name="connectionFactory" ref="confact" />
</bean>
<bean id="pooledConnectionFactory" 
class="org.apache.activemq.pool.PooledConnectionFactory" 
init-method="start" 
destroy-method="stop">
<property name="maxConnections" value="8" />
<property name="connectionFactory" ref="confact" />
<property name="expiryTimeout" value="-1" />  
<property name="idleTimeout" value="-1" />
</bean>
<bean id="jmsConfig"  class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="concurrentConsumers" value="8"/>
</bean>
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>

<bean id="myDeadLetterErrorHandler" class="org.apache.camel.builder.DeadLetterChannelBuilder">
<property name="deadLetterUri" value="jms:queue:deadlc"/>
<property name="redeliveryPolicy" ref="myRedeliveryPolicyConfig"/>
</bean>
<bean id="myRedeliveryPolicyConfig" class="org.apache.camel.processor.RedeliveryPolicy">
<property name="maximumRedeliveries" value="3"/>
<property name="redeliveryDelay" value="5000"/>
</bean>

我只有一个路由具有基于内容的路由器,它本质上说,如果消息主体具有getInfo,则从jms:foo路由到jms:getInfo,如果消息体具有performAction,则从jms。foo路由至jms:performAction

我预计,当jms:getInfo使用者没有运行时,交付将失败,重新交付尝试将如springXML中所指定的那样。但什么也没发生,也并没有进入死信队列,而是activemq抛出异常,如下所示。

有人能解释为什么它不起作用吗?我的理解是,当消费者(将从基于内容的路由器接收消息的目的地)没有运行时,acivemq知道它甚至不应该发送消息,并且应该立即抛出异常,然后由骆驼的死信通道按照配置进行处理。

我使用的是camel 2.10.3和activemq 5.8.0。

我怀疑activemq在处理失败的消息传递时忽略了死信队列的camel-xml配置,因为这些失败的消息显示在activemq web控制台的activemq.DLQ中。如果使用者正在运行,则消息将根据camel.xml.中指定的路由成功传递

0:58:46317|DEBUG|将JMS消息发送到:queue://getInfo带有消息:ActiveMQMapMessage{commandId=0,responseRequired=false,messageId=null,originalDestination=null,originalTransactionId=null,producterId=null、destination=null、transactionId=null、过期时间=0、时间戳=0、到达时间=0,brokerInTime=0、brokerOutTime=0,correlationId=a79a0f9e-c88f-4501-a56d-2d5667aa98e0,replyTo=temp-queue://ID:myhost-57639-13822323787365-3:3:6,persistent=false,type=null,priority=4,groupID=null,groupSequence=0,targetConsumerId=null,compressed=false,userID=null,content=null,marshalled properties=null,dataStructure=null,rediveryCounter=0,size=0,properties={breadcrumbId=ID:myhost-57439-138222345453-1:1:1:6,CamelJmsDeliveryMode=1},readOnlyProperties=false,readOnlyBody=false,droptable=false}ActiveMQMapMessage{Table={cluster=x78}}|org.apache.camel.component.jms.JmsConfiguration|camel(camel)线程#2-JmsConsumer[bar]20:58:48044|DEBUG|Eviting inactive entry ID:8b0d77a3-afef-4612-a49f-22c1b81eb80e(在20000毫秒后超时)|org.apache.camel.comcomponent.jms.reply.CorrelationTimeoutMap|camel(camel)thread#9-JmsReplyManagerTimeoutChecker[getInfo]20:58:48044|WARN|等待correlationID为[8b0d77a3-afef-4612-a49f-22c1b81eb80e]的回复消息20000毫秒后发生超时。正在设置上的ExchangeTimedOutException(消息ID:myhost-57439-138222345453-1:1:1:5,位于ExchangeId:ID-myhost-57640-1382233788284-0-2上)并继续路由。|org.apache.camel.comcomponent.jms.reply.TemporaryQueueReplyManager|camel(camel)线程#9-JmsReplyManager超时检查器[getInfo]20:58:48045|WARN| JMS消息侦听器执行失败。引起原因:[org.apache.camel.RuntimeCamelException-org.apache.camil.ExchangeTimedOutException:在20000毫秒内未收到OUT消息,未收到correlationID:8b0d77a3-afef-4612-a49f-22c1b81eb80e的回复消息。Exchange[JmsMessage[JmsMessage ID:ID:myhost-57439-138222345453-1:1:1:5]]|org.apache.camel.component.jms.EndpointMessageListener|camel(camel)线程#4-JmsConsumer[酒吧]org.apache.camel.RuntimeCamelException:org.apache.camil.ExchangeTimedOutException:在20000毫秒内未收到OUT消息,未收到correlationID:8b0d77a3-afef-4612-a49f-22c1b81eb80e的回复消息。Exchange[JmsMessage[JmsMessageID:ID:myhost-57439-138222345453-1:1:1:5]]网址:org.apache.cocamel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1271)位于org.apache.cocamel.component.jms.EndpointMessageListener$EndpointMessageListener AsyncCallback.done(EndpointMessageListener.java:187)位于org.apache.cocamel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:108)位于org.springframework.jms.elistener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562)位于org.springframework.jms.elistener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500)位于org.springframework.jms.elistener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)位于org.springframework.jms.elistener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessage ListerContaine.java:326)位于org.springframework.jms.elistener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessage ListerContaine.java:264)位于org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListerContainer.java:1071)位于org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngingLoop(DefaultMessageListerContainer.java:1063)位于org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListerContainer.java:960)位于java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)位于java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)位于java.lang.Thread.run(Thread.java:662)由:org.apache.camel.ExchangeTimedOutException引起:在20000毫秒内未收到OUT消息,因为未收到correlationID:8b0d77a3-afef-4612-a49f-22c1b81eb80e的回复消息。Exchange[JmsMessage[JmsMessageID:ID:myhost-57439-138222345453-1:1:1:5]]网址:org.apache.cocamel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:133)网址:org.apache.cocamel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:61)网址:org.apache.cocamel.component.jms.reply.CorrelationTimeoutMap.onEviction(CorrelationTimeoutMap.java:53)网址:org.apache.cocamel.component.jms.reply.CorrelationTimeoutMap.onEviction(CorrelationTimeoutMap.java:30)网址:org.apache.cocamel.support.DefaultTimeoutMap.purge(DefaultTimeoutMap.java:209)网址:org.apache.cocamel.support.DefaultTimeoutMap.run(DefaultTimeoutMap.java:159)位于java.util.concurrent.Executors$RunnableAdapter.call(Executitors.java:441)位于java.util.concurrent.FFutureTask$Sync.innerRunAndReset(FutureTask.java:317)位于java.util.concurrent.FFutureTask.runAndReset(FutureTask.java:150)位于java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecu

您的JMS消息具有replyTo = temp-queue://ID:myhost-57639-1382233787365-3:3:6。Camel期望收到回复消息,这就是为什么您会得到20000毫秒的超时。

相关内容

  • 没有找到相关文章

最新更新