我使用Apache Camel并获得一个大文件作为输入,我必须逐行处理。内容已经排序,我必须用相同的相关键聚合所有连续的行。如果关联键更改,则必须完成先前的聚合。如果文件结束,那么最后的聚合也完成了。我有一些限制条件:-因为传入的文件相当大,我们希望以流式方式处理它。-因为结果给了一个同步端点,我不想使用超时完成谓词。否则,我将失去调节数据源消耗速度的背压,并且交换将积聚在AggregateProcessor的超时映射和聚合存储库中。
PreCompletionAwareAggregationStrategy看起来像一个很有前途的解决方案,但事实证明,最后一个聚合将不会完成,直到下一个文件到达。如果我在preComplete中使用CamelSplitComplete属性,则最后一次聚合完成,但没有最后一次传入交换。相反,这最后一次交换将被添加到下一个到达的文件的内容中。
所以目前我在寻找一个不太难看的解决方案。
在描述的场景中,我将使用聚合器(让我们称之为"AggregationRoute")向路由发送拆分消息,其聚合策略实现了PreCompletionAwareAggregationStrategy(我猜您已经使用它的方式)。然后,当分裂结束时,将AGGREGATION_COMPLETE_ALL_GROUPS头设置为true,并将其发送给AggregationRoute。此交换将仅用作完成所有聚合组的信号。
的例子:
...
.split(body()).streaming()
.to("direct:aggregationRoute")
.end()
.setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true))
.to("direct:aggregationRoute");
from("direct:aggregationRoute")
.aggregate([your correlation expression]), myAggregationStrategy)
...
另一种选择是使用AggregateController通过调用其方法forceCompletionOfAllGroups()来结束所有组的聚合:
AggregateController aggregateController = new DefaultAggregateController();
from(...)
...
.split(body()).streaming()
.aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController)
...
// Do what you need to do with the aggregated exchange
...
.end()
.end()
.bean(aggregateController, "forceCompletionOfAllGroups")
好吧,也许一种方法是,因为您的数据已经排序,所以可以以流式方式进行解析,并将具有相同correlationkey的每一行添加到某些散列表结构中。一旦遇到新的correlationkey,您实际上希望"刷新"哈希映射以创建新消息,然后重新启动相同的进程。看看这里:http://camel.apache.org/how-do-i-write-a-custom-processor-which-sends-multiple-messages.html