发布订阅流之后,将文件从入站适配器移动



我正在尝试实现以下流程:1(从入站适配器读取文件2(他们使用带有应用序列的Publish-Cubscribe通道发送到不同的流量3(所有订户流都准备就绪后移动文件

这是主要流量

return IntegrationFlows
                .from(Files.inboundAdapter(inboundOutDirectory)
                           .regexFilter(pattern)
                           .useWatchService(true)
                           .watchEvents(FileReadingMessageSource.WatchEventType.CREATE),
                        e -> e.poller(Pollers.fixedDelay(period)
                                             .taskExecutor(Executors.newFixedThreadPool(poolSize))
                                             .maxMessagesPerPoll(maxMessagesPerPoll)))
                .publishSubscribeChannel(s -> s
                        .applySequence(true)
                        .subscribe(f -> f
                                .transform(Files.toStringTransformer())
                                .<String>handle((p, h) -> {
                                       return "something"
                                    }
                                })                                
                                .channel("consolidateFlow.input"))
                        .subscribe(f -> f
                                .transform(Files.toStringTransformer())
                                .handle(Http.outboundGateway(testUri)
                                            .httpMethod(HttpMethod.GET)
                                            .uriVariable("text", "payload")                                            .expectedResponseType(String.class))
                                .<String>handle((p, h) -> {
                                    return "something";
                                })
                                .channel("consolidateFlow.input")))
                .get();

和聚合:

public IntegrationFlow consolidateFlow()
return flow -> flow
                .aggregate()
                .<List<String>>handle((p, h) -> "something").log()
    }
}

在发布后使用以下代码

之后
.handle(Files.outboundGateway(this.inboundProcessedDirectory).deleteSourceFiles(true))

最终以

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

如果我完全没有达到整合/聚合流。

.handle(Files.outboundAdapter(this.inboundProcessedDirectory))

知道我如何解决它?目前,我正在通过从标头读取原始文件名来移动文件,但似乎不是正确的解决方案。我还考虑使用Success 逻辑将规格/建议应用于移动文件,但不确定这是否是正确的方法。

不确定。

edit1 正如Artem所建议的那样,我在发布订阅中添加了另一个订户,如下所示:

...
.channel("consolidateNlpFlow.input"))
                        .subscribe(f -> f
                                .handle(Files.outboundAdapter(this.inboundProcessedDirectory).deleteSourceFiles(true))
...

文件的移动正确,但是完全没有执行ConsolidateFlow。任何想法?我还尝试将通道添加到新的Flow .channel("consolidateNlpFlow.input")中,但并没有改变行为。

您的问题是consolidateFlow无法返回结果。仅仅因为有任何网关。您在那里做一个明确的.channel("consolidateFlow.input"),这意味着不会返回。这是为了您到目前为止的问题。

关于可能的解决方案。

根据您的配置,publishSubscribeChannel中的两个订户均在同一线程上执行。因此,您很容易与该Files.outboundAdapter()deleteSourceFiles(true)添加更多订户。现有订阅者将已经被调用。

相关内容

  • 没有找到相关文章

最新更新