下面我有Camel(通过Spring DSL)成功地将我的bean与ActiveMQ队列集成:
<!-- Note: this code is just a snippet; if you need to see more, please let me know! -->
<camelContext id="my-camel-context" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="activemq-myinstance:queue:myqueue" />
<onException>
<exception>java.lang.Exception</exception>
<redeliveryPolicy maximumRedeliveries="2" />
<to uri="activemq-myinstance:queue_failures" />
</onException>
<to uri="bean:myBean?method=doCommand" />
</route>
</camelContext>
<bean id="jmsConnectionFactory-myqueue" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.instance.url}" />
</bean>
<bean id="pooledConnectionFactory-myqueue" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="maxConnections" value="64" />
<property name="maximumActive" value="${max.active.consumers}" />
<property name="connectionFactory" ref="jmsConnectionFactory-myqueue" />
</bean>
<bean id="jmsConfig-myqueue" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory-myqueue"/>
<property name="concurrentConsumers" value="${max.active.consumers}"/>
</bean>
<bean id="activemq-myqueue" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig-myqueue"/>
</bean>
我想显式强制套接字超时(在Socket.read()
上)-在Camel和ActiveMQ之间- 25秒。因此,当Camel尝试将消息路由到ActiveMQ时,如果ActiveMQ需要超过25秒才能完成响应,我希望线程能够优雅地退出。显然,如果还可以设置某种故障转移(以便可以在将来重新播放超时请求),那么这比仅仅丢失消息要好得多!
我怎样才能做到这一点?提前感谢!
更新:如果Camel/JMS/ActiveMQ不支持这个开箱,我不介意写我自己的"ThreadManager
"在25秒后中断/停止线程,但我不确定要实现/扩展什么接口/类,并随后连接到我的Spring bean
默认情况下,Camel activemq请求在20秒后超时。
对于发送超时,org.apache.activemq.ActiveMQConnectionFactory有一个sendTimeout属性。还要检查jmsconfiguration的requesttimeout选项。
对于故障转移,可以在代理url中设置故障转移传输。
在brokerURL上设置timeout
属性
failover:(tcp://localhost:61616)?timeout=25000
这会将错误传播回你的生产者,这样你就可以处理它,而不是让它永远阻塞线程…