IntegrationFlow+2个条件转换器+出站网关



我有一个集成流,它需要根据某些条件运行一个或另一个转换器,然后通过出站网关发布http请求。

@Bean
public IntegrationFlow messageFromKafka() {
return flow -> flow
.publishSubscribeChannel(s -> s
.subscribe(f1 -> f1
.<AttachmentEvent>filter(validator::isCondition1)
.transform(transformer1)
)
.subscribe(fl -> fl
.<AttachmentEvent>filter(validator::isCondition2)
.transform(transformer2)
.split()
)
)
.publishSubscribeChannel(s -> s
.subscribe(fl1 -> fl1
.transform(httpTransformer)
.<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
.subFlowMapping("operation1", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
)
.subFlowMapping("operation2", sf -> sf
.<String>filter(message -> isVendorStatusDescNotCancelled(message))
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
)
.subFlowMapping("operation3", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
)
)
)
.subscribe(fl2 -> fl2
.handle(getKafkaHandler())
)
);
}  

这是我的尝试,但我收到了这个错误消息";没有可用的输出信道或replyChannel报头";我想我理解为什么,但不确定如何实现我需要的。

谢谢。

在集成中,使用router模式处理条件流:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-路由章节

尽管看起来你的问题与病情的解决完全无关。

我认为您的每个handle(getOAuth2Handler(...))都会返回一些值,而这些值在这些子流中不会作为回复处理。如果您对该回复不感兴趣,请考虑在handle()之后为这些子流配置nullChannel

最新更新