从 CXF 端点并行消费



我遇到了从 CXF 端点并行消费的问题。例如,如果我将 50 个或更多并发请求发送到 Web 服务中,在 Camel 路由中作为 CXF 端点发布,则只有 25 个线程从中消耗并开始路由处理。无论 Web 服务器上的 SYNC/ASYNC 请求处理情况如何(默认情况下使用 Jetty(,都会发生这种情况。我试图增加码头池的大小 - 也没有效果。所以问题是:在哪里定义了来自 CXF 端点的并行消费限制。

我们有 Apache Camel 2.15.1,JBossFuse 6.2.1 下的 Apache CXF 3.0.4

以下是码头和 CXF 端点配置:

<!-- Jetty -->
<bean id="server" class="org.eclipse.jetty.server.Server"/>
<httpj:engine-factory bus="cxf">
    <httpj:identifiedThreadingParameters id="sampleThreading1">
        <httpj:threadingParameters minThreads="100" maxThreads="200"/>
    </httpj:identifiedThreadingParameters>
    <httpj:engine port="9001">
        <httpj:threadingParametersRef id="sampleThreading1"/>
        <httpj:connector>
            <bean class="org.eclipse.jetty.server.bio.SocketConnector">
                <property name = "port" value="9001" />
            </bean>
        </httpj:connector>
        <httpj:handlers>
            <bean class="org.eclipse.jetty.server.handler.DefaultHandler"/>
        </httpj:handlers>
        <httpj:sessionSupport>true</httpj:sessionSupport>
    </httpj:engine>
</httpj:engine-factory>
<!-- CXF -->
<cxf:cxfEndpoint
        id="abcOutboundService"
        address="http://localhost:9001/cxf/ABCOutbound"
        xmlns:s="http://www.smpbank.ru/ABC"
        serviceName="s:ABCOutboundRq"
        endpointName="s:ASBOABCOutPort"
        wsdlURL="model/ASBOABCOut/ABCOutboundRq.wsdl">
    <cxf:properties>
        <entry key="dataFormat" value="PAYLOAD"/>
    </cxf:properties>
</cxf:cxfEndpoint>

和路由定义:

<camelContext id="AdpABCOutReq_WS" xmlns="http://camel.apache.org/schema/blueprint">
    <route id="adpabcout.ws" startupOrder="10" errorHandlerRef="wsProcessingErrorHandler">
        <from uri="cxf:bean:abcOutboundService"/>
        <log message="REQUEST CONSUMED BY Thread:[${threadName}] FROM WEB SERVICE: Headers:[${headers}]nBody:[${body}]"/>
...
    </route>
</camelContext>

您可能遇到了 CXF 工作队列限制。您可以尝试通过以下方式设置工作队列

 <cxfcore:workqueue name="default" 
    highWaterMark="${work.queue.high.limit}" 
    lowWaterMark="${work.queue.low.limit}" 
    initialSize="${work.queue.initial.size}" 
    dequeueTimeout="${work.queue.timeout}" 
    queueSize="${work.queue.size}"/>

默认值为 highWaterMark=25lowWaterMark=5initialSize=0queueSize=256queueTimeout=2mins

最新更新