Spring AMQP 消费者神秘地放弃了与队列的连接



我们使用的是 spring-amqp 1.5.2,RabbitMQ 版本 3.5.3。所有队列都工作正常,我们有消费者毫无问题地监听它们,除了一个消费者不断神秘地断开连接。Spring-AMQP Auto 恢复,但几个小时后,消费者断开连接,再也无法恢复。

队列声明为

    @Bean()
public Queue analyzeTransactionsQueue(){
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000);
    return new Queue("analyze.txns", true, false, false, args);
}

其他队列以类似的方式声明,并且没有问题。

使用者(侦听器)声明为

    @Bean
public SimpleRabbitListenerContainerFactory analyzeTransactionListenerContainerFactory(ConnectionFactory connectionFactory, AsyncTaskExecutor asyncTaskExecutor) {
connectionFactory.getVirtualHost());
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(2);
    factory.setMaxConcurrentConsumers(4);
    factory.setTaskExecutor(asyncTaskExecutor);
    ConsumerTagStrategy consumerTagStrategy = new ConsumerTagStrategy() {
        @Override
        public String createConsumerTag(String queue) {
            return queue;
        }
    };
    factory.setConsumerTagStrategy(consumerTagStrategy);
    return factory;
}

同样,其他没有问题的消费者也以类似的方式声明。

收到消息后的代码没有异常。即使在为 SimpleMessageListenerContainer 打开 DEBUG 日志记录后,日志中也没有错误。

LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Cancelling Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@10.17.1.13:5672/,47), acknowledgeMode=AUTO local queue size=0; 
LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Idle consumer terminating: Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@10.17.1.13:5672/,47), acknowledgeMode=AUTO local queue size=0; 

关于为什么会发生这种情况的任何想法。尝试过调试日志记录,但无济于事。

我观察到的一件事是,如果在解析过程中出现异常,消费者会断开连接,并且它并不总是记录问题,具体取决于您的日志记录配置...

从那时起,我总是将句柄交付方法包装成 try catch,以获得更好的日志记录并且没有连接丢弃:

consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            log.info("processing message - content : " + new String(body, "UTF-8"));
           try {
                MyEvent myEvent = objectMapper.readValue(new String(body, "UTF-8"), MyEvent.class);
                processMyEvent(myEvent);
            } catch (Exception exp) {
                log.error("couldn't process "+MyEvent.class+" message : ", exp);
            }
        }
    }; 

查看您配置事物的方式,很明显您已经启用了消费者的动态扩展。

factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);

我提交了一个修复程序,导致使用者数量降至零,这是一个线程问题。这是在消费者缩小规模时发生的。

从表面上看,您一直是该问题的受害者。我相信该修复程序已被向后移植,可以在这里看到

尝试使用最新版本,看看是否遇到同样的问题。

最新更新