我正在尝试实现以下场景:
- 我得到一堆具有共同文件模式的文件,即doc0001_page0001、doc0001_page0002、doc0001_page0003、doc0002_page0001(其中 doc0001 是一个由 3 页组成的文档,我需要合并,doc0002 只有 1 页)
- 我想以一种仅在收集特定文档的所有文件时才释放组的方式聚合它们(拾取 3 个文件后的 doc0001,1 个文件后的 doc0002 )
我的想法是按字母顺序读取文件,并在上次修改组后等待 2 秒以释放它(g.getLastModified()
小于当前时间减去 2 秒)
我尝试了以下方法但没有成功:
return IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
.patternFilter("*.json")
.useWatchService(true)
.watchEvents(FileReadingMessageSource.WatchEventType.CREATE,
FileReadingMessageSource.WatchEventType.MODIFY),
e -> e.poller(Pollers.fixedDelay(100)
.errorChannel("filePollingErrorChannel")))
.enrichHeaders(h -> h.headerExpression("CORRELATION_PATTERN", "headers[" + FileHeaders.FILENAME + "].substring(0,7)")) // docxxxx.length()
.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
.releaseStrategy(g -> g.getLastModified() < System.currentTimeMillis() - 2000)) .channel(MessageChannels.queue("fileReadingResultChannel"))
.get();
将发布策略更改为以下内容也不起作用:
.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
.releaseStrategy(g -> {
Stream<Message<?>> stream = g.getMessages()
.stream();
Long timestamp = (Long) stream.skip(stream.count() - 1)
.findFirst()
.get()
.getHeaders()
.get(MessageHeaders.TIMESTAMP);
System.out.println("Timestamp: " + timestamp);
return timestamp.longValue() < System.currentTimeMillis() - 2000;
}))
我是否误解了发布策略概念?
另外,是否可以从发布策略块中打印出一些内容?我想比较时间戳(见System.out.println("Timestamp: " + timestamp);
)
是的,由于您不知道消息组的整个序列,因此除非使用groupTimeout
,否则您别无选择。常规releaseStrategy
仅在消息到达聚合器时起作用。由于在一条消息中,您没有足够的信息来释放组,因此它将永远位于组存储中。
聚合器引入了groupTimeout
选项,特别是对于这种用例,当我们绝对希望发布一个没有足够的消息来正常分组的组时。
您可以考虑使用groupTimeoutExpression
而不是基于常量的groupTimeout
。MessageGroup
是 SpEL 的根评估上下文对象,因此您将能够访问上述lastModified
。
.sendPartialResultOnExpiry(true)
是在这里处理的正确选择。
在文档中查看详细信息:https://docs.spring.io/spring-integration/reference/html/#agg-and-group-to
我用不同的方法找到了解决方案。我仍然不明白为什么上面的不起作用。
我还找到了一种更简洁的方式来定义相关函数。
IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
.patternFilter("*.json")
.useWatchService(true)
.watchEvents(FileReadingMessageSource.WatchEventType.CREATE, FileReadingMessageSource.WatchEventType.MODIFY), e -> e
.poller(Pollers.fixedDelay(100)))
.enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
.getHeaders()
.get(FileHeaders.FILENAME)).substring(0, 17)))
.aggregate(a -> a.groupTimeout(2000)
.sendPartialResultOnExpiry(true))
.channel(MessageChannels.queue("fileReadingResultChannel"))
.get();