如何通过DSL编写Spring Integration流



我正在尝试弄清楚如何将弹簧集成流拆分为多个子流并将它们组合在一起。 最终,我试图布置一种模式,在该模式中,我可以创建子流模块,这些模块可以拼凑在一起用于常见的集成配方。

此测试用例表示尝试(但失败(使用 DSL IntegrationFlow API 将子流连接在一起的最小示例:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ComposedIntegrationFlowTest.class })
@SpringIntegrationTest
@EnableIntegration
public class ComposedIntegrationFlowTest {
@Test
public void test() {
MessageChannel beginningChannel = MessageChannels.direct("beginning").get();
IntegrationFlow middleFlow = f -> f
.transform("From middleFlow: "::concat)
.transform(String.class, String::toUpperCase);
IntegrationFlow endFlow = f -> f
.handle((p, h) -> "From endFlow: " + p);
StandardIntegrationFlow composedFlow = IntegrationFlows
.from(beginningChannel)
.gateway(middleFlow)
.gateway(endFlow)
.get();
composedFlow.start();
beginningChannel.send(MessageBuilder.withPayload("hello!").build());
}
}

尝试上述操作,我得到:

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'beginning'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=hello!, headers={id=2b1de253-a822-42ba-cd85-009b83a644eb, timestamp=1537890950879}], failedMessage=GenericMessage [payload=hello!, headers={id=2b1de253-a822-42ba-cd85-009b83a644eb, timestamp=1537890950879}]

如何将这些子流拼凑在一起? 有没有更好的 API 来尝试进行这种组合? 此集成测试的结构是否正确?

composedFlow.start((;

这还不够;您需要向IntegrationFlowContext注册动态流,以便所有支持 bean 都注册到应用程序上下文中。

Autowire IntegrationFlowContext ...

@Autowired
IntegrationFlowContext integrationFlowContext;

然后注册您的流... ...

integrationFlowContext.registration(integrationFlow).register();

最新更新