2个JMS代理(ActiveMQ)之间的XA事务



我正在尝试在两个不同的远程activeMQ代理之间移动jms消息,并且在阅读了大量之后

我正在使用Atomikos,因为我正在编写一个独立的应用程序,我也在使用spring来让整个程序正常工作。

我有以下bean javaconfig设置

@Bean(name="atomikosSrcConnectionFactory")
public AtomikosConnectionFactoryBean consumerXAConnectionFactory() {
AtomikosConnectionFactoryBean consumerBean = new AtomikosConnectionFactoryBean();
consumerBean.setUniqueResourceName("atomikosSrcConnectionFactory");
consumerBean.setLocalTransactionMode(false);
return consumerBean;
}
@Bean(name="atomikosDstConnectionFactory")
public AtomikosConnectionFactoryBean producerXAConnectionFactory() {
AtomikosConnectionFactoryBean producerBean = new AtomikosConnectionFactoryBean();
producerBean.setUniqueResourceName("atomikosDstConnectionFactory");
producerBean.setLocalTransactionMode(false);
return producerBean;
}
@Bean(name="jtaTransactionManager")
public JtaTransactionManager jtaTransactionManager() throws SystemException {
JtaTransactionManager jtaTM = new JtaTransactionManager();
jtaTM.setTransactionManager(userTransactionManager());
jtaTM.setUserTransaction(userTransactionImp());
return jtaTM;
}
@Bean(initMethod="init", destroyMethod="close", name="userTransactionManager")
public UserTransactionManager userTransactionManager() {
UserTransactionManager utm = new UserTransactionManager();
utm.setForceShutdown(false);
return utm;
}
@Bean(name="userTransactionImp")
public UserTransactionImp userTransactionImp() throws SystemException {
UserTransactionImp uti = new UserTransactionImp();
uti.setTransactionTimeout(300);
return uti;
}
@Bean(name="jmsContainer")
@Lazy(value=true)
public DefaultMessageListenerContainer jmsContainer() throws SystemException {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setAutoStartup(false);
dmlc.setTransactionManager(jtaTransactionManager());
dmlc.setSessionTransacted(true);
dmlc.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
dmlc.setConnectionFactory(consumerXAConnectionFactory());
dmlc.setDestinationName("srcQueue");
return dmlc;
}
@Bean(name="transactedJmsTemplate")
public JmsTemplate transactedJmsTemplate() {
DynamicDestinationResolver dest = new DynamicDestinationResolver();
JmsTemplate jmsTmp = new JmsTemplate(producerXAConnectionFactory());
jmsTmp.setDeliveryPersistent(true);
jmsTmp.setSessionTransacted(true);
jmsTmp.setDestinationResolver(dest);
jmsTmp.setPubSubDomain(false);
jmsTmp.setReceiveTimeout(20000);
jmsTmp.setExplicitQosEnabled(true);
jmsTmp.setSessionTransacted(true);
jmsTmp.setDefaultDestination(new ActiveMQQueue("destQueue"));
jmsTmp.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return jmsTmp;
}

在我启动DMLC之前,2个AtomikosConnectionFactoryBean正在运行时包装一个ActiveMQXAConnectionFactory(每个代理一个)。

然后,我使用以下方法设置了一个简单的messageListener(在dmlc启动之前将其分配给它):

public void onMessage(Message message) {
final Message rcvedMsg = message;
try{
MessageCreator msgCreator = new MessageCreator(){
public Message createMessage(Session session) throws JMSException{
Message returnMsg = null;
if(rcvedMsg instanceof TextMessage){
TextMessage txtMsg = session.createTextMessage();
txtMsg.setText(((TextMessage) rcvedMsg).getText());
returnMsg = txtMsg;
}
else if(rcvedMsg instanceof BytesMessage){
BytesMessage bytesMsg = session.createBytesMessage();
if(!(((BytesMessage) rcvedMsg).getBodyLength() > Integer.MAX_VALUE)){
byte[] bodyContent = new byte[(int) ((BytesMessage) rcvedMsg).getBodyLength()];
bytesMsg.writeBytes(bodyContent);
returnMsg = bytesMsg;
}
}
return returnMsg;
}
};
jmsTemplate.send(msgCreator);
}
catch(JmsException | JMSException e){
logger.error("Error when transfering message: '{}'. {}",message,e);
}
}

应用程序启动时没有任何特定的错误或警告,但当我将消息放入源队列时,我可以通过日志看到onMessage方法正在为同一消息反复运行,就好像事务一直在回滚并重新启动一样(任何地方都不会出现错误)。

我还注意到,如果我碰巧使用了相同的源和目标url(意思是相同的代理,但每个代理都有自己的connectionFactory),它就会正常工作,消息会在源和目标队列之间按预期传输。

我想知道的是

  1. 我在设置中做错了什么?为什么当使用两个不同的代理时,我的交易"似乎"被一次又一次地回滚,但当使用相同的代理时(但在两个不同连接工厂上)却能工作
  2. 我不完全相信onMessage目前正在进行正确的事务处理,因为我目前正在捕捉所有异常,什么都不做,我相信这将在jmstemplate发送消息之前提交dmlc的事务,但我不确定。如果是这种情况,那么SessionAwareMessageListener会更好吗?我应该在onMessage方法中设置@Transacted吗

有人能帮忙澄清这个问题吗?欢迎所有意见。

更新:

我意识到"回滚"的问题是由于我使用的两个AMQ通过代理网络相互连接,并且我碰巧使用相同的队列名称作为源和目标。这导致了应用程序将消息从一个AMQ传输到另一个,然后立即,因为源AMQ上有一个消费者,消息将被传输回原始AMQ,而我的应用程序将其视为新消息,并再次传输,循环无限继续。下面发布的解决方案有助于解决其他问题。

try {
... Code
} catch (JmsException je) {
logger.error("Error when transfering message: '{}'. {}",message,e);
}

上面的代码正在吞噬异常,您应该要么不捕获异常,要么重新抛出,以便事务管理能够适当地处理它。目前看不到任何异常,执行提交可能会导致奇怪的结果。

我想做如下的事情,JmsException来自Spring,和Spring中的大多数例外一样,是RuntimeException。只需重新trow,也可以正确地记录异常堆栈,删除日志语句中的第二个{}

try {
... Code
} catch (JmsException je) {
logger.error("Error when transfering message: '{}'.",message,e);
throw je;
}

然而,这将重复日志记录,因为Spring也会记录错误。

对于JMSException,执行类似的操作,将其转换为JmsException

try {
... Code
} catch (JMSException je) {
logger.error("Error when transfering message: '{}'.",message,e);
throw JmsUtils.convertJmsAccessException(je);
}

要获得有关发生的情况的更多信息,您可能需要为org.springframework.jms包启用DEBUG日志记录。这将使您深入了解发送/接收消息时会发生什么。

另一件事是使用事务会话和手动确认消息,但是在代码中没有执行message.acknowledge()。由于JTA事务,Spring不会调用它。请尝试将其切换为SESSION_TRANSACTED。至少对于DMLC

最新更新