如何创建连接jdbc:inbound通道适配器和jdbc:outbound通道适配器的Spring Integratio



我想写一个示例应用程序来创建一个复制表

我的配置如下作为第1步:我测试了入站通道适配器以写入jdbcMessageHandler。这是正常的

步骤2:将出站连接到入站。以下是关于此步骤的内容。

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd">
<int-jdbc:inbound-channel-adapter id="dataChannel"
query="select * from articles where sent = 0" 
update="update articles set sent = 1 where id in (:id)"
data-source="dataSource" row-mapper="articleRowMapper">
<int:poller fixed-rate="10000">
<int:transactional />
</int:poller>
</int-jdbc:inbound-channel-adapter>

<!-- <int:service-activator input-channel="dataChannel" ref="jdbcMessageHandler" /> 
<bean id="jdbcMessageHandler" class="com.demo.service.JdbcMessageHandler" /> -->

<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<int:poller default="true" fixed-rate="10000" />
<int:channel id="dataChannel">
<int:queue />
</int:channel>

<bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/test" />
<property name="username" value="demo" />
<property name="password" value="password" />
</bean>
<bean id="articleRowMapper" class="com.demo.domain.ArticleRowMapper" />
<int-jdbc:outbound-channel-adapter id="jdbcOutbound"
channel="dataChannel"
data-source="dataSource"
sql-parameter-source-factory="sqlParameterSource"
query="INSERT INTO ARTICLES(ID, NAME, CATEGORY , TAGS , AUTHOR , SENT) VALUES(:id, :name, :category,:tags,:author,:sent)"/>
<bean id="sqlParameterSource"   class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="id" value="payload.id"/>
<entry key="name" value="payload.name"/>
<entry key="category" value="payload.category"/>
<entry key="tags" value="payload.tags"/>
<entry key="author" value="payload.author"/>
<entry key="sent" value="payload.sent"/>                    
</map>
</property>
</bean>

错误:

严重:org.springframework.messaging.MessageHandlingException:错误在消息处理程序中发生[org.springframework.integration.jdbc.JdbcMessageHandler#0];嵌套的例外情况是org.springframework.dao.TransientDataAccessResourceException:PreparedStatementCallback;SQL[插入到文章中(ID、NAME、,CATEGORY,TAGS,AUTHOR,SENT(VALUES(?,?,??,?(无效参数值:java.io.NotSerializableException;嵌套异常为java.sql.SQLException:参数值无效:java.io.NotSerializableException,failedMessage=GenericMessage[有效载荷=[com.demo.domain.Article@49023a31,com.demo.domain.Article@2d25f418,com.demo.domain.Article@31d44fe8],headers={id=1c16e823-b70e-fa1-2ea5-5475e3f8dbde,时间戳=1661550341366}]org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153(

我认为一个列表来自入站,需要由出站正确处理(只需要1条消息?(

如何管理这个列表?

我正在使用相同的数据通道,我想避免这种情况。(有更多松动的联轴器(。我能做些什么来实现这一点?

感谢

请确保使用最新的Spring Integration版本。批更新支持是很久以前添加的:https://docs.spring.io/spring-integration/docs/current/reference/html/jdbc.html#batch-更新。您也可以在两者之间添加一个splitter以逐个插入。我相信大多数驱动程序都会正确地将字符串参数映射到CLOB,所以我们可能不需要PrepareDStatementSetter特性。尽管您仍然可以通过prepared-statement-setter属性进行设置。

感谢@ArtemBilan的拆分器建议

注意:有重构的空间,但这只是一个Poc

我的配置

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd">

<int:channel id="testchannel"/>

<int:inbound-channel-adapter channel="testchannel" method="produce"  ref="producer">        
<int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>
<bean id="producer" class="com.demo.Producer"/>

<int-jdbc:outbound-channel-adapter id="jdbcOutboundCreator" 
channel="testchannel"                                           
data-source="dataSource"
sql-parameter-source-factory="sqlParameterSource2"
query="INSERT INTO articleslob(ID, NAME, CATEGORY , TAGS , AUTHOR , sent, msg ) VALUES(:id, :name, :category,:tags,:author,:sent,:msg)"

/>
<bean id="sqlParameterSource2"   class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="id" value="payload.id"/>
<entry key="name" value="payload.name"/>
<entry key="category" value="payload.category"/>
<entry key="tags" value="payload.tags"/>
<entry key="author" value="payload.author"/>
<entry key="sent" value="payload.sent"/>
<entry key="msg" value="payload.msg" />
</map>
</property>
</bean>

<!-- <entry key="msg" value="new org.springframework.jdbc.core.support.SqlLobValue(payload.msg)" /> -->
<int:channel id="inProgressChannel">
<int:queue />
</int:channel>
<int-jdbc:inbound-channel-adapter id="inProgressChannel"
query="select * from articleslob where sent = 0" 
update="update articleslob set sent = 1 where id in (:id)"
data-source="dataSource" row-mapper="articleRowMapper">
<int:poller fixed-rate="100">
<int:transactional />
</int:poller>
</int-jdbc:inbound-channel-adapter>

<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<int:poller default="true" fixed-rate="1000" />

<bean id="articleSplitterBean" class="com.demo.service.SplitterService"/>
<int:splitter id="testSplitter" input-channel="inProgressChannel" ref="articleSplitterBean" output-channel="outChannel">
</int:splitter>
<int:channel id="outChannel">
<int:queue />
</int:channel>
<bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/test" />
<property name="username" value="demo" />
<property name="password" value="password" />
</bean>
<bean id="articleRowMapper" class="com.demo.domain.ArticleRowMapper" />


<int-jdbc:outbound-channel-adapter id="jdbcOutbound"    
channel="outChannel"                                        
data-source="dataSource"
sql-parameter-source-factory="sqlParameterSource"
query="INSERT INTO articlescopy(ID, NAME, CATEGORY , TAGS , AUTHOR , sent , msg ) VALUES(:id, :name, :category,:tags,:author,:sent , :msg)"
/>
<bean id="sqlParameterSource"   class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="id" value="payload.id"/>
<entry key="name" value="payload.name"/>
<entry key="category" value="payload.category"/>
<entry key="tags" value="payload.tags"/>
<entry key="author" value="payload.author"/>
<entry key="sent" value="payload.sent"/>                
<entry key="msg" value="payload.msg" />
</map>
</property>
</bean>

我的拆分器

public List<Article> split(Message<?> message) {

return (List<Article>)message.getPayload();

}

修复1:使用拆分器拆分带有列表的邮件->消息列表

修复2:我本来不打算为send列设置值(所以没有为send设置get/set(,但我创建了包括sendwithpayload.sent的映射。这就是错误所在。

相关内容

  • 没有找到相关文章