Spring AMQP 同步事务回滚不起作用。在这里,源中的数据库事务不是由 Spring 处理的。我需要在一个事务中接收和发送Spring AMQP消息。以下是相关代码的快照。如果您需要其他任何内容,请告诉我。
/////Connection Factory initialization
@Bean
public ConnectionFactory getConnectionFactory() {
System.out.println("hello");
configManager();
String ip = ConfigManager.getQueueServerHost();
System.out.println("IP Address : "+ip);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(ip);
connectionFactory.setUsername(ConfigManager.getQueueUserName());
connectionFactory.setPassword(ConfigManager.getQueuePassword());
connectionFactory.setPort(ConfigManager.getQueueServerPort());
//connectionFactory.setPublisherReturns(true);
//connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
/////Rabbit Template initialization
@Bean
public RabbitTemplate producerRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory());
rabbitTemplate.setRoutingKey(ConfigManager.getProducerQueueName());
rabbitTemplate.setQueue(ConfigManager.getProducerQueueName());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
/////Transactional Code
@Transactional(readOnly=false, rollbackFor=Exception.class)
public void processFile(RabbitTemplate rabbitTemplate)throws Exception{
rabbitTemplate.setRoutingKey(ConfigManager.getConsumerQueueName());
rabbitTemplate.setQueue(ConfigManager.getConsumerQueueName());
Object messageObj = rabbitTemplate.receiveAndConvert();
Message message = null;
try{
if(messageObj != null){
if (messageObj instanceof Message){
message = (Message)messageObj;
System.out.println("Message received is '" + message.getFileName() + "' for Hospital "+message.getHospitalId());
String newFileName = this.process(message.getFileName(), message.getHospitalId());
this.sendMessage(newFileName, message.getHospitalId());
}else{
System.out.println("Unknown message received '" + messageObj + "'");
}
}
}catch(Exception e){
e.printStackTrace();
throw e;
}
}
它对我来说很好用;我刚刚上传了一个带有测试用例的 Gist,显示它可以工作。
我建议您打开TRACE
级别日志记录以查看所有事务活动(并将其与我放入 Gist 中的日志进行比较)。