仅聚合具有相同关联键的连续交换



我使用Apache Camel并获得一个大文件作为输入,我必须逐行处理。内容已经排序,我必须用相同的相关键聚合所有连续的行。如果关联键更改,则必须完成先前的聚合。如果文件结束,那么最后的聚合也完成了。我有一些限制条件:-因为传入的文件相当大,我们希望以流式方式处理它。-因为结果给了一个同步端点,我不想使用超时完成谓词。否则,我将失去调节数据源消耗速度的背压,并且交换将积聚在AggregateProcessor的超时映射和聚合存储库中。

PreCompletionAwareAggregationStrategy看起来像一个很有前途的解决方案,但事实证明,最后一个聚合将不会完成,直到下一个文件到达。如果我在preComplete中使用CamelSplitComplete属性,则最后一次聚合完成,但没有最后一次传入交换。相反,这最后一次交换将被添加到下一个到达的文件的内容中。

所以目前我在寻找一个不太难看的解决方案。

在描述的场景中,我将使用聚合器(让我们称之为"AggregationRoute")向路由发送拆分消息,其聚合策略实现了PreCompletionAwareAggregationStrategy(我猜您已经使用它的方式)。然后,当分裂结束时,将AGGREGATION_COMPLETE_ALL_GROUPS头设置为true,并将其发送给AggregationRoute。此交换将仅用作完成所有聚合组的信号。

的例子:


    ...
    .split(body()).streaming()
        .to("direct:aggregationRoute")
    .end()
    .setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true))
    .to("direct:aggregationRoute");
from("direct:aggregationRoute")
    .aggregate([your correlation expression]), myAggregationStrategy)
    ...

另一种选择是使用AggregateController通过调用其方法forceCompletionOfAllGroups()来结束所有组的聚合:


AggregateController aggregateController = new DefaultAggregateController();
from(...)
    ...
    .split(body()).streaming()
        .aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController)
            ...
            // Do what you need to do with the aggregated exchange
            ...
        .end()
    .end()
    .bean(aggregateController, "forceCompletionOfAllGroups")

好吧,也许一种方法是,因为您的数据已经排序,所以可以以流式方式进行解析,并将具有相同correlationkey的每一行添加到某些散列表结构中。一旦遇到新的correlationkey,您实际上希望"刷新"哈希映射以创建新消息,然后重新启动相同的进程。看看这里:http://camel.apache.org/how-do-i-write-a-custom-processor-which-sends-multiple-messages.html

相关内容

  • 没有找到相关文章

最新更新