我正在尝试实现以下流程:1(从入站适配器读取文件2(他们使用带有应用序列的Publish-Cubscribe通道发送到不同的流量3(所有订户流都准备就绪后移动文件
这是主要流量
return IntegrationFlows
.from(Files.inboundAdapter(inboundOutDirectory)
.regexFilter(pattern)
.useWatchService(true)
.watchEvents(FileReadingMessageSource.WatchEventType.CREATE),
e -> e.poller(Pollers.fixedDelay(period)
.taskExecutor(Executors.newFixedThreadPool(poolSize))
.maxMessagesPerPoll(maxMessagesPerPoll)))
.publishSubscribeChannel(s -> s
.applySequence(true)
.subscribe(f -> f
.transform(Files.toStringTransformer())
.<String>handle((p, h) -> {
return "something"
}
})
.channel("consolidateFlow.input"))
.subscribe(f -> f
.transform(Files.toStringTransformer())
.handle(Http.outboundGateway(testUri)
.httpMethod(HttpMethod.GET)
.uriVariable("text", "payload") .expectedResponseType(String.class))
.<String>handle((p, h) -> {
return "something";
})
.channel("consolidateFlow.input")))
.get();
和聚合:
public IntegrationFlow consolidateFlow()
return flow -> flow
.aggregate()
.<List<String>>handle((p, h) -> "something").log()
}
}
在发布后使用以下代码
之后.handle(Files.outboundGateway(this.inboundProcessedDirectory).deleteSourceFiles(true))
最终以
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
如果我完全没有达到整合/聚合流。
.handle(Files.outboundAdapter(this.inboundProcessedDirectory))
知道我如何解决它?目前,我正在通过从标头读取原始文件名来移动文件,但似乎不是正确的解决方案。我还考虑使用Success 逻辑将规格/建议应用于移动文件,但不确定这是否是正确的方法。
不确定。edit1 正如Artem所建议的那样,我在发布订阅中添加了另一个订户,如下所示:
...
.channel("consolidateNlpFlow.input"))
.subscribe(f -> f
.handle(Files.outboundAdapter(this.inboundProcessedDirectory).deleteSourceFiles(true))
...
文件的移动正确,但是完全没有执行ConsolidateFlow。任何想法?我还尝试将通道添加到新的Flow .channel("consolidateNlpFlow.input")
中,但并没有改变行为。
您的问题是consolidateFlow
无法返回结果。仅仅因为有任何网关。您在那里做一个明确的.channel("consolidateFlow.input")
,这意味着不会返回。这是为了您到目前为止的问题。
关于可能的解决方案。
根据您的配置,publishSubscribeChannel
中的两个订户均在同一线程上执行。因此,您很容易与该Files.outboundAdapter()
和deleteSourceFiles(true)
添加更多订户。现有订阅者将已经被调用。