Camel拆分并行处理聚合消息



可能看起来是一个微不足道的问题,但不幸的是无法使其工作

这是代码

from("direct:START")
.process( (ex) -> {
List<Integer> pages = IntStream.range(1,5).boxed().collect(Collectors.toList());
ex.getOut().setBody( pages );
})
.split(body())
.parallelProcessing()
.to("http://someurl?page=${body}");
--> Get the collective body here

如何完成这项工作!

您可以将Aggregator与completionSize表达式一起用于聚合拆分的消息。

.split(body()).parallelProcessing().to("log:splitted_body_is_here")
.aggregate(constant(true), AggregationStrategies.groupedBody())
.completionSize(exchangeProperty(Exchange.SPLIT_SIZE))
.to("log:aggregated_body_is_here")

如果您使用的是旧版本的camel(2.20.x(AggregationStrategies.groupedBody((将不可用。你可以使用其他方法。我使用了一个简单的自定义方法来执行聚合。

代码更改为

.split(body()).parallelProcessing().to("log:splitted_body_is_here")
.aggregate(constant(true), (in,out) ->{
if( in == null ){
return out;
}
else{
String body = in.getIn().getBody(String.class);
body = body + "," + out.getIn().getBody( String.class );
in.getOut().setBody( body );
return in;
}
})
.completionSize(exchangeProperty(Exchange.SPLIT_SIZE))
.to("log:aggregated_body_is_here")

上面的代码只是假设主体是String/JSON,并用逗号附加它。

看起来,您想要在to中调用具有动态url的端点。不支持,请改用toD

最新更新