我使用 Spring Integration 从 JMS 读取消息,处理它们,然后使用我自己的 dbPersistor 的持久化方法将它们持久化到数据库中,该方法具有返回类型 void。我编写了一个测试用例来验证发布到 JMS 的消息是否已成功持久化到数据库中。对于此测试,我的 SI 和 JMS 配置如下 -
<int:poller fixed-delay="500" default="true"/>
<int:channel id="inputChannel">
<int:queue/>
</int:channel>
<int:channel id="errorChannel">
<int:queue/>
</int:channel>
<jms:message-driven-channel-adapter id="jmsInboudAdapter"
connection-factory="connectionFactory" destination-name="MessageQueue"
channel="inputChannel" error-channel="errorChannel" transaction-manager="dbTxManager"
acknowledge="transacted"/>
<int:chain id="handlerChain" input-channel="inputChannel">
<int:service-activator ref="jmsMessageHandler" method="handleMessage" />
<int:service-activator ref="dbPersistor" method="persist" />
</int:chain>
然后在测试中我执行以下操作 -
- jmsTemplate.send()
- verifyMessageWasPersistedToDB
当我只将一条消息发布到数据库时,这很好用。但是当我遍历 jmsTemplate.send() 发布多条消息时,主线程在 SI 线程仍在执行时完成操作并尝试验证数据库中的消息并失败,因为某些消息尚未持久化。我的问题是——
- 如何使主线程等待 SI 线程完成,然后调用验证方法?
- 如果发生数据库异常和回滚,如何验证失败的消息是否返回到原始队列中?
谢谢 AJ
-
inputChannel
不应该是队列通道 - 当消息插入队列时,JMS 事务将提交 - 数据库事务不会在 JMS 事务范围内执行。您必须为此使用直接通道(删除轮询器并<queue/>
)。请参阅有关 Spring 集成中事务的文档。 -
您必须轮询数据库以获取结果;您可能会添加一个拦截器和一些 CountDownLatch,但轮询数据库直到结果出现或一段时间到期会更容易。