Spring集成:如何在MessageChain中间实现JDBCOutboundGateway



对我来说,这似乎是一个简单的问题,可能会在各地复制。MessageHandlerChain的一个非常基本的应用程序,可能只使用开箱即用的功能。

从概念上讲,我需要的是:

(1) Polled JDBC reader (sets parameters for integration pass)
|
V
(2) JDBC Reader (uses input from (1) to fetch data to feed through channel
|
V
(3) JDBC writer (writes data fetched by (2) to target)
|
V
(4) JDBC writer (writes additional data from the original parameters fetched in (1))

我想我需要的是

Flow:
From: JdbcPollingChannelAdapter (setup adapter)
Handler: messageHandlerChain
Handlers (
JdbcPollingChannelAdapter (inbound adapter)
JdbcOutboundGateway (outbound adapter)
JdbcOutboundGateway (cleanup gateway)
)

JdbcPollingChannelAdapter没有实现MessageHandler API,因此我不知道如何根据设置步骤读取实际数据。

由于JdbcOutboundGateway没有实现MessageProducer API,因此对于出站适配器需要使用什么,我有点不知所措。

我应该使用OOB类吗?或者,我需要以某种方式将两个适配器封装在BridgeHandlers中以使其工作吗?

提前感谢


编辑(2(附加配置问题

安装适配器正在向后拉一行,其中包含两个时间戳列。它们正由";富集报头";块

但是,当入站适配器执行时,框架会将java.lang.Object作为参数传入。不是String,也不是Timestamp,而是new Object ()中的一个实际java.lang.Object。

它传递了正确数量的对象,但内容和数据类型都丢失了。我认为需要配置ExpressionEvaluatingSqlParameterSourceFactory是否正确?

消息:

GenericMessage [payload=[{startTime=2020-11-18 18:01:34.90944, endTime=2020-11-18 18:01:34.90944}], headers={startTime=2020-11-18 18:01:34.90944, id=835edf42-6f69-226a-18f4-ade030c16618, timestamp=1605897225384}]

JdbcOutboundGateway中的SQL:

Select t.*, w.operation as "ops" from ADDRESS t
Inner join TT_ADDRESS w 
on (t.ADDRESSID = w.ADDRESSID)
And (w.LASTUPDATESTAMP >= :payload.from[0].get("startTime") and w.LASTUPDATESTAMP <= :payload.from[0].get("endTime") )

编辑:添加了解决方案java DSL配置

private JdbcPollingChannelAdapter setupAdapter; // select only
private JdbcOutboundGateway inboundAdapter; // select only
private JdbcOutboundGateway insertUpdateAdapter; // update only
private JdbcOutboundGateway deleteAdapter; // update only
private JdbcMessageHandler cleanupAdapter; // update only
setFlow(IntegrationFlows
.from(setupAdapter, c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("ALC_startTime", "payload.from[0].get("ALC_startTime")")
.headerExpression("ALC_endTime", "payload.from[0].get("ALC_endTime")"))
.handle(inboundAdapter)
.enrichHeaders(h -> h.headerExpression("ALC_operation", "payload.from[0].get("ALC_operation")"))
.handle(insertUpdateAdapter)
.handle(deleteAdapter)
.handle(cleanupAdapter)
.get());
flowContext.registration(flow).id(this.getId().toString()).register();

如果您想将原始参数传送到流中的最后一个网关,则需要将这些参数存储在标头中,因为在每一步之后,回复消息的有效负载都将不同,并且您将不再有原始设置数据。这是第一个。

第二:如果您处理IntegrationFlow和JavaDSL,则不必担心messageHandlerChain,因为从概念上讲,IntegrationFlow本身就是一个链,但更先进。

我不知道为什么您需要使用JdbcPollingChannelAdapter根据流开始时来自源的传入消息按需请求数据。

对于SELECT模式,您肯定仍然需要使用JdbcOutboundGatewayupdateQuery是可选的,所以网关只需要执行SELECT并在回复消息的有效负载中为您返回数据。

如果你接下来的两个步骤只是";写";如果你不关心结果,你可能只需要看看一个PublishSubscribeChannel和两个JdbcMessageHandler作为它的订阅者。如果没有为PublishSubscribeChannel提供Executor,它们将被逐一执行。

最新更新