Spring 集成:从 JMS 读取 -> 处理消息 -> 持久化在数据库中



我使用 Spring Integration 从 JMS 读取消息,处理它们,然后使用我自己的 dbPersistor 的持久化方法将它们持久化到数据库中,该方法具有返回类型 void。我编写了一个测试用例来验证发布到 JMS 的消息是否已成功持久化到数据库中。对于此测试,我的 SI 和 JMS 配置如下 -

<int:poller fixed-delay="500" default="true"/>
<int:channel id="inputChannel">
<int:queue/>
</int:channel>
<int:channel id="errorChannel">
<int:queue/>
</int:channel>
<jms:message-driven-channel-adapter id="jmsInboudAdapter"         
connection-factory="connectionFactory" destination-name="MessageQueue" 
channel="inputChannel" error-channel="errorChannel" transaction-manager="dbTxManager"                                    
acknowledge="transacted"/>
<int:chain id="handlerChain" input-channel="inputChannel">
<int:service-activator ref="jmsMessageHandler" method="handleMessage" />
<int:service-activator ref="dbPersistor" method="persist" />   
</int:chain>

然后在测试中我执行以下操作 -

  • jmsTemplate.send()
  • verifyMessageWasPersistedToDB

当我只将一条消息发布到数据库时,这很好用。但是当我遍历 jmsTemplate.send() 发布多条消息时,主线程在 SI 线程仍在执行时完成操作并尝试验证数据库中的消息并失败,因为某些消息尚未持久化。我的问题是——

  1. 如何使主线程等待 SI 线程完成,然后调用验证方法?
  2. 如果发生数据库异常和回滚,如何验证失败的消息是否返回到原始队列中?

谢谢 AJ

  1. inputChannel不应该是队列通道 - 当消息插入队列时,JMS 事务将提交 - 数据库事务不会在 JMS 事务范围内执行。您必须为此使用直接通道(删除轮询器并<queue/>)。请参阅有关 Spring 集成中事务的文档。

  2. 您必须轮询数据库以获取结果;您可能会添加一个拦截器和一些 CountDownLatch,但轮询数据库直到结果出现或一段时间到期会更容易。

最新更新