Spring Integration DSL语法问题-如何动态构建子流



我正试图在Spring Integration中构建一个复杂的流,其中子流在运行时动态定义。在主流定义中运行良好的代码在子流定义中编译失败。由于结构看起来完全相同,所以不清楚发生了什么。任何解释都将不胜感激。

提前谢谢。

主流定义编码如下:

StandardIntegrationFlow flow = IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
// This one compiles fine
.enrichHeaders(h -> h.headerExpression("start", "start")")
.headerExpression("end", "payload[0].get("end")"))
.split(tableSplitter)
.enrichHeaders(h -> h.headerExpression("object", "payload[0].get("object")"))
.channel(c -> c.executor(stepTaskExecutor))
.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
.aggregate()
.handle(cleanupAdapter).get();

buildRecipientListRouterSpecForRules定义为:

private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
Collection<RuleMetadata> rules) {
rules.forEach(
rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));
return recipientListSpec;
}

createFlowDefForRule((只是一个switch()包装器,用于为规则定义的流选择要运行的实际DSL。这是的样本

public IntegrationFlowDefinition constructASpecificFlowDef(IntegrationFlowDefinition flowDef, RuleMetadata rule) {
return flowDef
// This enrichHeaders element fails to compile,
// The method headerExpression(String, String) is undefined for the type Object
.enrichHeaders(h -> h.headerExpression("ALC_operation", "payload[0].get("ALC_operation")"));
}

通常,最好将此类解释放在问题文本中,而不是在代码片段中作为注释;我完全错过了那个评论。

你能提供一个精简(更简单(的例子(完整的类(来展示这种行为,这样我们就可以玩了吗?

我试着简化你正在做的事情,这编译得很好,并按预期工作:

@SpringBootApplication
public class So65010958Application {
public static void main(String[] args) {
SpringApplication.run(So65010958Application.class, args);
}
@Bean
IntegrationFlow flow() {
return IntegrationFlows.from("foo")
.routeToRecipients(r -> r.recipientFlow("true", f -> buildFlow(f)))
.get();
}
private IntegrationFlowDefinition<?> buildFlow(IntegrationFlowDefinition<?> f) {
return f.enrichHeaders(h -> h.headerExpression("foo", "'bar'"))
.channel(MessageChannels.queue("bar"));
}
@Bean
public ApplicationRunner runner(MessageChannel foo, PollableChannel bar) {
return args -> {
foo.send(new GenericMessage<>("foo"));
System.out.println(bar.receive(0));
};
}
}

GenericMessage〔payload=foo,headers={foo=bar,id=d526b8fb-c6f8-7731-b1ad-e68e326fcc00,timestamp=1606333567749}〕

所以,我一定错过了什么。

最新更新