我们使用的是 spring-amqp 1.5.2,RabbitMQ 版本 3.5.3。所有队列都工作正常,我们有消费者毫无问题地监听它们,除了一个消费者不断神秘地断开连接。Spring-AMQP Auto 恢复,但几个小时后,消费者断开连接,再也无法恢复。
队列声明为
@Bean()
public Queue analyzeTransactionsQueue(){
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000);
return new Queue("analyze.txns", true, false, false, args);
}
其他队列以类似的方式声明,并且没有问题。
使用者(侦听器)声明为
@Bean
public SimpleRabbitListenerContainerFactory analyzeTransactionListenerContainerFactory(ConnectionFactory connectionFactory, AsyncTaskExecutor asyncTaskExecutor) {
connectionFactory.getVirtualHost());
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);
factory.setTaskExecutor(asyncTaskExecutor);
ConsumerTagStrategy consumerTagStrategy = new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue;
}
};
factory.setConsumerTagStrategy(consumerTagStrategy);
return factory;
}
同样,其他没有问题的消费者也以类似的方式声明。
收到消息后的代码没有异常。即使在为 SimpleMessageListenerContainer 打开 DEBUG 日志记录后,日志中也没有错误。
LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Cancelling Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@10.17.1.13:5672/,47), acknowledgeMode=AUTO local queue size=0;
LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Idle consumer terminating: Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@10.17.1.13:5672/,47), acknowledgeMode=AUTO local queue size=0;
关于为什么会发生这种情况的任何想法。尝试过调试日志记录,但无济于事。
我观察到的一件事是,如果在解析过程中出现异常,消费者会断开连接,并且它并不总是记录问题,具体取决于您的日志记录配置...
从那时起,我总是将句柄交付方法包装成 try catch,以获得更好的日志记录并且没有连接丢弃:
consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("processing message - content : " + new String(body, "UTF-8"));
try {
MyEvent myEvent = objectMapper.readValue(new String(body, "UTF-8"), MyEvent.class);
processMyEvent(myEvent);
} catch (Exception exp) {
log.error("couldn't process "+MyEvent.class+" message : ", exp);
}
}
};
查看您配置事物的方式,很明显您已经启用了消费者的动态扩展。
factory.setConcurrentConsumers(2); factory.setMaxConcurrentConsumers(4);
我提交了一个修复程序,导致使用者数量降至零,这是一个线程问题。这是在消费者缩小规模时发生的。
从表面上看,您一直是该问题的受害者。我相信该修复程序已被向后移植,可以在这里看到
尝试使用最新版本,看看是否遇到同样的问题。