我正在实现一个从数据库中读取行的路由,使用拆分器拆分它们,并行处理它们,并聚合和更新数据库。当分路器路由只有一个输入时,一切都按预期进行。样本代码-
<route>
<from uri="direct:splitter"/>
<log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}" />
<split strategyRef="aggregatorStrategy" executorServiceRef="myPool">
<simple>${body}</simple>
<log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}, split index - $simple{property.CamelSplitIndex}" />
<to uri="bean:gisResponseProcessor" />
</split>
</route>
当我将3条消息发送到direct:splitter时(每条消息需要几分钟才能完成处理),并让它们全部并行处理。当我尝试这样做时,分离器外所有3个输入的第一条日志消息会立即打印出来。但是,来自拆分器内部的日志消息表明,3个交换机中的每一个都被逐个拆分。每个子消息的子消息都使用线程池。有没有一种方法可以使拆分器并行拆分3个输入交换机?
是,使用seda
路由或vm
路由。这两种方法都有相同的作用,但用途略有不同。我建议你看看我关于差异的答案:骆驼分离器并行处理。
您面临的问题是,您希望将消息并行发送到拆分器。因此,消息1、2和3被并行处理。这可以通过使用seda
组件来实现。seda
组件是异步的,允许您并行处理路由上的消息。
试试这个路线:
<route>
<from uri="direct:splitter"/>
<log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}" />
<to uri="seda:sedaSplitter"/>
</route>
<route id="sedaSplitter>
<from uri="seda:sedaSplitter?multipleConsumers=true&concurrentConsumers=16"/>
<split strategyRef="aggregatorStrategy" executorServiceRef="myPool">
<simple>${body}</simple>
<log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}, split index - $simple{property.CamelSplitIndex}" />
<to uri="bean:gisResponseProcessor" />
</split>
</route>
注意,消息被处理到seda
组件路由,其中MESSAGE
在parallel中被处理。您可以将parallelProcessing="true"
包含在拆分器中,以便并行处理拆分中的各个项目,从而使其更加并行。
如果你需要更多的信息,大声呼喊。
检查名为myPool
的线程池中是否至少有6个线程可用