当放置相同的文件进行重新处理时,未调用Spring Integration FileAggregator



在下面的流程中,文件拆分和聚合可以很好地处理新的(不同的(文件。当Poller再次放置并拾取处理过的文件时,它将进入FileSplitterSpec(),但永远不会到达FileAggregator

处理后的文件上是否有任何标记,表明下次无法到达聚合器?

return IntegrationFlows.from(txnChannel())
.split(fileSplitterSpec())
.filter(payload -> !(payload instanceof FileMarker), e -> e.discardChannel("aggregatorChannel"))
.<String> filter(StringUtils::isNotBlank, e -> e.discardChannel("aggregatorChannel"))
.transform(transformer, "transform")
.handle((payload, headers) -> {
headers.get("indHeader", DTO.class).getTxn().add(payload);
return payload;
})
.channel("aggregatorChannel")
.aggregate(new FileAggregator())

@Bean
public FileSplitterSpec fileSplitterSpec() {
return Files.splitter()
.markers()
.firstLineAsHeader("someHeader");
}

是的,组聚合的相关键基于文件名。您需要使用不同的文件名,或者考虑为expireGroupsUponCompletion(true)配置聚合器。new FileAggregator()应该转到聚合器规范的processor()选项。

最新更新