使用TransactionManager(JMS,数据库)弹出多个事务



我有一个方法,要在其中执行两个事务:一个使用DB,一个使用JMS。我希望一个接一个的承诺。我正在尝试使用PlatformTransactionManager。有两种方法:使用TransactionTemplateDefaultTransactionDefinition。但我没有找到一个使用过多次的例子。我想做的是:

void do(){
T dbTransaction = ...; // here goes: new TransactionTemplate(transactionManager) two times
T jmsTransaction = ...; // or: new DefaultTransactionDefinition() and transactionManager.getTransaction(definition); two times
saveDb();
sendJms();
dbTransaction.commit();
jmsTransaction.commit();
}

但我不确定该使用什么以及如何使用,因为在这篇文章中,它说:

无论如何,一旦我们使用配置创建了TransactionTemplate,所有事务都将使用该配置执行。因此,如果我们需要多个配置,我们应该创建多个模板实例。

那么,如何正确创建两个transaction并相继关闭?我应该创建两个单独的definitions还是可以重用一个?我可以在两个templates中重复使用相同的transactionManager吗?我知道DB有一个@Transcational注释,我也可以将JMS配置为使用transactions,但是:

  1. 我没有找到如何配置JMS以使用事务的好例子
  2. 我不确定他们会在哪一天关门

所以我想手动执行。我也不确定这个手动事务是否能使用JMS(例如IBM-MQ),因为我只看到了数据库事务的示例。

您的用例简单而常见。您希望发送JMS消息,等待消息完成,然后提交到数据库。这是在两个事务上完成的——一个用于JMS消息,另一个用于数据库。这些事务都存在于单个事务上下文中。启动JMS事务时,将不存在任何事务上下文,因此将创建一个事务上下文。启动数据库事务时,它将加入现有的事务上下文。这些事务将被同步,因为JMS事务必须在提交数据库事务之前成功完成。

此操作的核心是事务管理器。查看您链接的文章,他们多次引用PlatformTransactionManager。在您的用例中,PlatformTransactionManager必须是一个支持JTA的事务管理器。JTA事务管理器将能够创建事务上下文并注册和同步事务。

请注意,这是两个本地事务,这不是XA或分布式事务。在这种情况下,如果JMS本地事务失败,那么数据库本地事务将被回滚。更具体地说,是事务上下文被标记为仅回滚。如果发生任何未处理的异常,则事务上下文标记为仅回滚。在本地事务上调用commit()的任何尝试都将失败,并显示一条消息,说明事务上下文仅为回滚。

实现这一目标取决于平台。例如,如果您的Spring项目部署在JBoss等应用程序服务器上的WAR文件中,那么PlatformTransactionManager将自动连接。如果您使用的是Spring Boot,那么大多数配置甚至不包括事务管理器。

我在这里有一个事务性的Spring JMS和Camel for Spring Boot。这是IBM MQ的一个简单消息桥。如果没有其他内容,那么Spring JMS和注释以及事务性IBM MQ的示例应该很有用。也许Camel钻头也有用。

注意,pom.xml文件包含:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-narayana</artifactId>
</dependency>

这个Spring Boot启动器将安装并配置Arjuna JTA事务管理器作为PlatformTransactionManager

在我的例子中,我有:

<logger name="com.arjuna" level="TRACE" additivity="false">
<appender-ref ref="STDOUT" />
</logger>

这为ArjunaJTA事务管理器提供了非常好的日志记录。

最重要的是,获得一个配置为PlatformTransactionManager的JTA事务管理器。使用它可以创建事务上下文,并在该上下文中同步两个本地事务。

示例项目应该易于运行。日志记录内容丰富。

在这种特殊情况下,您到底为什么希望使用JMS事务还不清楚,我甚至反对它——至少正如您在上面介绍的那样。

基本上,您希望在状态成功存储到数据库后发布消息。

既然你的真相来源于数据库,为什么不把所有后续行动都建立在成功完成的基础上呢?

例如,构建它的一种方法是这样的(面向Spring,因为您已经提到您正在使用它):

  1. 创建一个作用域为当前事务的JmsSenderbean。这可以通过实现BeanFactoryPostProcessor并执行以下操作来实现:
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SimpleTransactionScope transactionScope = new SimpleTransactionScope();
// The scope exists, but is not registered by Spring by default.
beanFactory.registerScope("transaction", transactionScope);
}
// in a separate configuration class defining your JmsSender bean
@Bean
@Scope("transaction")
public JmsSender jmsSender() { return new JmsSender(); }
  1. 每次调用此bean的send()方法时,都会向内部队列中添加一条消息。这通常是一个ThreadLocal<List<T>>——事实上,Spring处理事务管理的方式基本相同
  2. 创建一个AfterCommitJmsPublisherbean,它是TransactionSynchronizationAdapter——这意味着我们希望在提交时有额外的行为
  3. 注册AfterCommitJmsPublisher。这意味着在事务之前调用TransactionSynchronizationManager.registerSynchronization(jmsPublisher)。使用例如Aspects、声明性事务管理(@Transactional)和SpringAOP来实现这一点的一种方法是:
@Aspect
@Component
public class AfterCommitJmsPublisher extends TransactionSynchronizationAdapter {
private final JmsPublisher;
@Pointcut("@annotation(org.springframework.transaction.annotation.Transactional)")
private void transactionalPointcut() {
}
@Before("transactionalPointcut()")
public void registerTransactionSynchronization() {
TransactionSynchronizationManager.registerSynchronization(this);
}
  1. 提交数据库事务后,调用类似jmsPublisher.publish()的东西。这可以在TransactionSynchronizationAdapterafterCommit()方法中完成:
// In AfterCommitJmsPublisher
@Override
public void afterCommit() {
jmsPublisher.publish();
}
  1. 如果事务已回滚,则调用类似jmsPublisher.clear()的东西。您可能不想发布任何有关失败操作的消息

这样,JMS消息总是绑定到它们产生的事务-如果数据库事务失败,则不会发送任何消息。

离开你的评论:

在手动场景中,如果JMS失败,它将失败,但如果JMS没有异常,它将保存到DB中,即使在JMS事务失败后,我也可以接受,因为我将状态保存在DB中。

这可能足以满足您的需求。但是,您可能需要考虑到,组件越多,系统需要的容错能力就越强,并考虑到可能不可用的外部服务。

这可能意味着将JMS消息保存在一个特殊的数据库表中(作为事务的一部分!),并且只在成功提交后发布,在成功发布后删除保存的消息。如果不成功,您可以执行管家任务,重新尝试发布您的消息。

最后,谈谈分布式事务:就我个人而言,如果可能的话,我建议不要使用它们,尤其是对于您当前的用例。它们是复杂的野兽,几乎肯定会影响应用程序的可用性,并增加事务中所有流程的端到端延迟。类似Saga模式的东西通常更适合分布式系统。

当然,这可能不适用于您的用例,并且您的一致性要求可能超过任何可用性要求,所以对此持谨慎态度。

最新更新