带有Jms连接器的Mule流,线程阻塞在动态出站端点



我有一个jms连接器,我从一个队列接收消息,在一个流中处理消息,调用db根据消息中的一些id获取数据,并将响应输出写入文件,我使用动态出站端点来决定输出位置。

    <jms:connector name="tibco" numberOfConsumers="20" ..... >
                  .....
    </jms:connector>
    <flow name="realtime" doc:name="ServiceId-8">
    <jms:inbound-endpoint queue="${some.queue}" connector-ref="tibco" doc:name="JMS">
       <jms:transaction action="ALWAYS_BEGIN"/>
    </jms:inbound-endpoint>
    <processor ref="proc1"></processor>
    <processor ref="proc2"></processor>
    <component doc:name="Java">
        <spring-object bean="comp1"/>
    </component>
    <processor ref="proc3"></processor>
    <collection-splitter doc:name="Collection Splitter"/>
    <processor ref="endpointprocessor"></processor>
    <foreach collection="#[message.payload.consumerEndpoints]" counterVariableName="endpoints" doc:name="Foreach">
            <when expression="#[consumerEndpoint.getOutputType().equals('txt') and consumerEndpoint.getChannel().equals('file')]">
                <processor-chain>
                    <file:outbound-endpoint path="#[consumerEndpoint.getPath()]" outputPattern="#[consumerEndpoint.getClientId()]-#[attributes['eventId']]%#[consumerEndpoint.getTicSeedCount()]-#[attributes['dateTime']].tic" responseTimeout="10000" doc:name="File"/>
                </processor-chain>  
            </when>
            <when expression="#[consumerEndpoint.getOutputType().equals('txt') and consumerEndpoint.getChannel().equals('ftp')]">
                 <processor-chain>
                    <ftp:outbound-endpoint path="#[consumerEndpoint.getPath()]" outputPattern="#[consumerEndpoint.getClientId()]-#[attributes['eventId']]%#[consumerEndpoint.getTicSeedCount()]-#[attributes['dateTime']].tic" host="#[consumerEndpoint.getHost()]" port="#[consumerEndpoint.getPort()]" user="#[consumerEndpoint.getChannelUser()]" password="#[consumerEndpoint.getChannelPass()]" responseTimeout="10000" doc:name="FTP"/>
                 </processor-chain>
            </when>
         </choice>
    </foreach>
    <rollback-exception-strategy doc:name="Rollback Exception Strategy">
        <processor ref="catchExceptionCustomHandling"></processor>
    </rollback-exception-strategy>
</flow>
上面的

不是完整的流程。我把重要的部分贴上去以便理解。

问题1。由于我没有在任何级别定义任何线程策略,并且连接器有numberOfConsumers="20",如果我在队列中丢弃20个消息,将启动多少线程。设置JMS队列预取大小为20

问题2:我需要在接收端和/或流级别配置线程策略吗?

当负载非常高时(假设一分钟内队列中有15k个msg),我看到消息处理变得很慢,线程转储显示如下所示:

"TIBCO EMS Session Dispatcher (7905958)" prio=10 tid=0x00002aaadd4cf000 nid=0x3714 waiting for monitor entry [0x000000004af1e000]java.lang.Thread.State: BLOCKED(在对象监视器上)org.mule.endpoint.DynamicOutboundEndpoint.createStaticEndpoint (DynamicOutboundEndpoint.java: 153)-等待锁定<0x00002aaab711c0e0> (a org.mule.endpoint.DynamicOutboundEndpoint)

任何帮助和指示将不胜感激。

谢谢——

由于动态端点,消息处理变得很慢,当创建和使用动态出站端点时,我看到线程拥塞。我使用的是mule 3.3。在看了mule 3.4之后。我意识到动态出站端点创建的处理更加恰当。升级到3.4,问题基本解决了。

最新更新