我对路由的理解(用Apache Camel的措辞)是,它表示从一个端点到另一个端点的数据流,并且在对数据执行EIP类型操作的过程中,它将在各个处理器处停止。
如果这是对一条路线的正确/公平评估,那么我正在建模一个问题,我认为需要在同一CamelContext
内有多条路线(我使用的是Spring):
- 路由1:从Source-1中提取数据,对其进行处理,将其转换为
List<SomePOJO>
,然后将其发送到聚合器 - 路由2:从Source-2中提取数据,对其进行处理,还将其转换为
List<SomePOJO>
,然后将其发送到聚合器 - 路由3:包含一个聚合器,该聚合器等待,直到它从路由1和路由2接收到
List<SomePOJO>
,然后继续处理聚合列表
事情是这样的:两个List<SomePOJO>
需要同时到达聚合器,或者更确切地说,聚合器bean必须等待,直到它从两个路由接收到数据,然后才能将这两个列表聚合为一个List<SomePOJO>
,并将聚合列表发送到路由3的其余部分。
到目前为止,我有以下伪编码的<camelContext>
:
<camelContext id="my-routes" xmlns="http://camel.apache.org/schema/spring">
<!-- Route 1 -->
<route id="route-1">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<!-- Extracts data from Source 1, processes it, and then produces a List<SomePOJO>. -->
<to uri="bean:extractor1?method=process" />
<!-- Send to aggregator. -->
<to uri="direct:aggregator" />
</route>
<!-- Route 2 -->
<route id="route-2">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<!-- Extracts data from Source 2, processes it, and then produces a List<SomePOJO>. -->
<to uri="bean:extractor2?method=process" />
<!-- Send to aggregator. -->
<to uri="direct:aggregator" />
</route>
<!-- Route 3 -->
<route id="route-3">
<from uri="direct:aggregator" />
<aggregate strategyRef="listAggregatorStrategy">
<correlationExpression>
<!-- Haven't figured this part out yet. -->
</correlationExpression>
<to uri="bean:lastProcessor?method=process" />
</aggregate>
</route>
</camelContext>
<bean id="listAggregatorStrategy" class="com.myapp.ListAggregatorStrategy" />
然后在Java中:
public class ListAggregatorStrategy implements AggregatoryStrategy {
public Exchange aggregate(Exchange exchange) {
List<SomePOJO> route1POJOs = extractRoute1POJOs(exchange);
List<SomePOJO> route2POJOs = extractRoute2POJOs(exchange);
List<SomePOJO> aggregateList = new ArrayList<SomePOJO>(route1POJOs);
aggregateList.addAll(route2POJOs);
return aggregateList;
}
}
我的问题
- 我的基本设置正确吗?换句话说,我是否正确使用
direct:aggregator
端点将数据从route-1
和route-2
发送到route-3
的聚合器 - 我的聚合器会像我期望的那样工作吗?假设
route-1
中的extractor1
bean只需要5秒即可运行,但route-2
中的extractor2
bean需要2分钟即可运行。在t=5时,聚合器应该从extractor1
接收数据,并开始等待(2分钟),直到extractor2
完成并将剩余的数据提供给它进行聚合。是吗
听起来你走在了正确的轨道上,Aggregator页面有很多关于这方面的好信息。
<correlationExpression>
是从每个路由匹配Exchange的关键,completionSize可以指定等待的数量。在您的情况下,每个路由似乎只设计为运行一次,在这种情况下,表达式可以使用每个Exchange的固定头值,否则您将需要为每个路由提供类似计数器类的东西。
以下是您的示例的更新:
<route id="route-1">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<to uri="bean:extractor1?method=process" />
<setHeader headerName="id">
<constant>myHeaderValue</constant>
</setHeader>
<to uri="direct:aggregator" />
</route>
<route id="route-2">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<to uri="bean:extractor2?method=process" />
<setHeader headerName="id">
<constant>myHeaderValue</constant>
</setHeader>
<to uri="direct:aggregator" />
</route>
<route id="route-3">
<from uri="direct:aggregator" />
<aggregate strategyRef="listAggregatorStrategy" completionSize="2">
<correlationExpression>
<simple>header.id</simple>
</correlationExpression>
<to uri="bean:lastProcessor?method=process" />
</aggregate>
</route>