Spring AMQP 同步事务回滚



Spring AMQP 同步事务回滚不起作用。在这里,源中的数据库事务不是由 Spring 处理的。我需要在一个事务中接收和发送Spring AMQP消息。以下是相关代码的快照。如果您需要其他任何内容,请告诉我。

/////Connection Factory initialization
@Bean
public ConnectionFactory getConnectionFactory() {
    System.out.println("hello");
    configManager();
    String ip = ConfigManager.getQueueServerHost();
    System.out.println("IP Address : "+ip);
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(ip);
    connectionFactory.setUsername(ConfigManager.getQueueUserName());
    connectionFactory.setPassword(ConfigManager.getQueuePassword());
    connectionFactory.setPort(ConfigManager.getQueueServerPort());
    //connectionFactory.setPublisherReturns(true);
    //connectionFactory.setPublisherConfirms(true);
    return connectionFactory;
}
/////Rabbit Template initialization
@Bean
public RabbitTemplate producerRabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory());
    rabbitTemplate.setRoutingKey(ConfigManager.getProducerQueueName());
    rabbitTemplate.setQueue(ConfigManager.getProducerQueueName());
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setChannelTransacted(true);
    return rabbitTemplate;
}

/////Transactional Code
@Transactional(readOnly=false, rollbackFor=Exception.class)
public void processFile(RabbitTemplate rabbitTemplate)throws Exception{
    rabbitTemplate.setRoutingKey(ConfigManager.getConsumerQueueName());
    rabbitTemplate.setQueue(ConfigManager.getConsumerQueueName());
    Object messageObj = rabbitTemplate.receiveAndConvert();
    Message message = null;
    try{
        if(messageObj != null){
            if (messageObj instanceof Message){
                message = (Message)messageObj;
                System.out.println("Message received is '" + message.getFileName() + "' for Hospital "+message.getHospitalId());
                String newFileName = this.process(message.getFileName(), message.getHospitalId());
                this.sendMessage(newFileName, message.getHospitalId());
            }else{
                System.out.println("Unknown message received '" +  messageObj + "'");           
            }
        }
    }catch(Exception e){
        e.printStackTrace();
        throw e;
    }
}

它对我来说很好用;我刚刚上传了一个带有测试用例的 Gist,显示它可以工作。

我建议您打开TRACE级别日志记录以查看所有事务活动(并将其与我放入 Gist 中的日志进行比较)。

最新更新