Spring Integration DSL-使用网关块线程进行组合



我正在努力学习如何将IntegrationFlow构建为单元,并将它们连接起来。

我设置了一个非常简单的处理集成流程:


IntegrationFlow processingFlow = f -> f
.<String>handle((p, h) -> process(p))
.log();
flowContext.registration(processingFlow)
.id("testProcessing")
.autoStartup(false)
.register();

处理非常简单:

public String process(String process) {
return process + " has been processed";
}

然后我从一个源组成一个流,使用.gateway()将源加入处理:

MessageChannel beginningChannel = MessageChannels.direct("beginning").get();
StandardIntegrationFlow composedFlow = IntegrationFlows
.from(beginningChannel)
.gateway(processingFlow)
.log()
.get();
flowContext.registration(composedFlow)
.id("testComposed")
.autoStartup(false)
.addBean(processingFlow)
.register();

然后我启动流程并发送几条消息:

composedFlow.start();
beginningChannel.send(MessageBuilder.withPayload(new String("first string")).build());
beginningChannel.send(MessageBuilder.withPayload(new String("second string")).build());

日志处理程序确认已经为第一条消息调用了handle方法,但主线程随后处于空闲状态,并且永远不会处理第二条消息。

这不是从构建块组成集成流的正确方式吗?要对通道执行此操作,需要将通道注册为bean,我正在尝试动态地执行所有这些操作。

必须是processingFlow中的logAndReply()。请参阅他们的JavaDocs以了解差异。流末尾的log()使其成为单向的。这就是为什么您被阻止的原因,因为网关在等待回复,但根据您当前的流定义并没有人。不幸的是,我们无法从框架级别确定这一点:在某些情况下,您可能确实没有根据某些路由或过滤逻辑返回。网关可以配置一个回复超时。默认情况下,它是无限的。

最新更新