我正在努力学习如何将IntegrationFlow构建为单元,并将它们连接起来。
我设置了一个非常简单的处理集成流程:
IntegrationFlow processingFlow = f -> f
.<String>handle((p, h) -> process(p))
.log();
flowContext.registration(processingFlow)
.id("testProcessing")
.autoStartup(false)
.register();
处理非常简单:
public String process(String process) {
return process + " has been processed";
}
然后我从一个源组成一个流,使用.gateway()
将源加入处理:
MessageChannel beginningChannel = MessageChannels.direct("beginning").get();
StandardIntegrationFlow composedFlow = IntegrationFlows
.from(beginningChannel)
.gateway(processingFlow)
.log()
.get();
flowContext.registration(composedFlow)
.id("testComposed")
.autoStartup(false)
.addBean(processingFlow)
.register();
然后我启动流程并发送几条消息:
composedFlow.start();
beginningChannel.send(MessageBuilder.withPayload(new String("first string")).build());
beginningChannel.send(MessageBuilder.withPayload(new String("second string")).build());
日志处理程序确认已经为第一条消息调用了handle方法,但主线程随后处于空闲状态,并且永远不会处理第二条消息。
这不是从构建块组成集成流的正确方式吗?要对通道执行此操作,需要将通道注册为bean,我正在尝试动态地执行所有这些操作。
必须是processingFlow
中的logAndReply()
。请参阅他们的JavaDocs以了解差异。流末尾的log()
使其成为单向的。这就是为什么您被阻止的原因,因为网关在等待回复,但根据您当前的流定义并没有人。不幸的是,我们无法从框架级别确定这一点:在某些情况下,您可能确实没有根据某些路由或过滤逻辑返回。网关可以配置一个回复超时。默认情况下,它是无限的。