Spring Integration DSL如何将拆分消息路由到不同的并发流



我可能讨厌写noob问题,就像其他人讨厌回答它们一样,但事情是这样的。

我需要根据负载中结果集的每一行中请求的操作,将从JdbcPollingChannelAdapter检索到的消息拆分为多个消息。

拆分操作非常简单。事实证明,有条件地将消息路由到一个流或另一个流是一个挑战。

经过多次尝试和错误,我相信这个流程代表了我的意图

/- insertUpdateAdapter -
Poll Table -> decorate headers -> split -> router -<                         >- aggregator -> cleanup
---- deleteAdapter ----/

为此,我构建了这个Java DSL:

final JdbcOutboundGateway inboundAdapter = createInboundAdapter();;
final JdbcOutboundGateway deleteAdapter = createDeleteAdapter();
final JdbcOutboundGateway insertUpdateAdapter = createInsertUpdateAdapter();

return IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("start", "payload[0].get("start")")
.headerExpression("end", "payload[0].get("end")"))
.handle(inboundAdapter)
.split(insertDeleteSplitter)
.enrichHeaders(h -> h.headerExpression("operation", "payload[0].get("operation")"))
.channel(c -> c.executor("stepTaskExecutor"))               
.routeToRecipients (r -> r
.recipientFlow("'I' == headers.operation or 'U' == headers.operation",
f -> f.handle(insertUpdateAdapter))
// This element is complaining "Syntax error on token ")", ElidedSemicolonAndRightBrace expected"
// Attempted to follow patterns from https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference#routers
.recipientFlow("'D' == headers.operation",
f -> f.handle(deleteAdapter))

.defaultOutputToParentFlow())
)
.aggregate()
.handle(cleanupAdapter)
.get();

我根据之前的工作做出的假设包括:

  1. 必要的通道自动创建为直接通道
  2. Route To Recipients是实现此功能的合适工具(我也考虑过表达式路由器,但如何添加子流的示例不如Route To Receivers清楚(
如果要并行运行拆分,请在拆分器和路由器之间的某个位置插入ExecutorChannel。您可以限制执行器的池大小来控制并发性。

.defaultOutputToParentFlow())后面有一个额外的括号

更正后的代码为:

return IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("ALC_startTime", "payload[0].get("ALC_startTime")")
.headerExpression("ALC_endTime", "payload[0].get("ALC_endTime")"))
.handle(inboundAdapter)
.split(insertDeleteSplitter)
.enrichHeaders(h -> h.headerExpression("ALC_operation", "payload[0].get("ALC_operation")"))
.channel(c -> c.executor(stepTaskExecutor))
.routeToRecipients (r -> r
.recipientFlow("'I' == headers.ALC_operation or 'U' == headers.ALC_operation",
f -> f.handle(insertUpdateAdapter))
// This element is complaining "Syntax error on token ")", ElidedSemicolonAndRightBrace expected"
// Attempted to follow patterns from https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference#routers
.recipientFlow("'D' == headers.ALC_operation",
f -> f.handle(deleteAdapter))
.defaultOutputToParentFlow())
.aggregate()
.handle(cleanupAdapter)
.get();

最新更新