发布后关闭 amqp 承诺连接



我正在尝试在发布消息后如何关闭基于承诺的连接。

我试图推断我的发送方和接收方的共享代码,所以我有一个这样的连接文件:

连接器.js

const amqp = require('amqplib');
class Connector {
  constructor(RabbitMQUrl) {
    this.rabbitMQUrl = RabbitMQUrl;
  }
  connect() {
    return amqp.connect(this.rabbitMQUrl)
      .then((connection) => {
        this.connection = connection;
        process.once('SIGINT', () => {
          this.connection.close();
        });
        return this.connection.createChannel();
      })
      .catch( (err) => {
        console.error('Errrored here');
        console.error(err);
      });
  }
}
module.exports = new Connector(
  `amqp://${process.env.AMQP_HOST}:5672`
);

然后我的发布者/发件人如下所示:

出版商.js

const connector = require('./connector');
class Publisher {
  constructor(exchange, exchangeType) {
    this.exchange = exchange;
    this.exchangeType = exchangeType;
    this.durabilityOptions = {
      durable: true,
      autoDelete: false,
    };
  }
  publish(msg) {
    connector.connect()
      .then( (channel) => {
        let ok = channel.assertExchange(
          this.exchange,
          this.exchangeType,
          this.durabilityOptions
        );
        return ok
          .then( () => {
            channel.publish(this.exchange, '', Buffer.from(msg));
            return channel.close();
          })
          .catch( (err) => {
            console.error(err);
          });
      });
  }
}
module.exports = new Publisher(
  process.env.AMQP_EXCHANGE,
  process.env.AMQP_TOPIC
);

但如前所述,我无法弄清楚如何在拨打publish()后关闭连接。

你可以添加一个 close(( 函数到连接器:

  close() {
      if (this.connection) {
          console.log('Connector: Closing connection..');
          this.connection.close();
      }
  }

发行人:

class Publisher {
  constructor(exchange, exchangeType) {
    this.exchange = exchange;
    this.exchangeType = exchangeType;
    this.durabilityOptions = {
      durable: true,
      autoDelete: false,
    };
  }
  connect() {
      return connector.connect().then( (channel) => {
         console.log('Connecting..');
         return channel.assertExchange(
          this.exchange,
          this.exchangeType,
          this.durabilityOptions
        ).then (() => {  
            this.channel = channel;
            return Promise.resolve();
        }).catch( (err) => {
            console.error(err);
        });;
      });
  }
  disconnect() {
      return this.channel.close().then( () => { return connector.close();});
  }
  publish(msg) {
      this.channel.publish(this.exchange, '', Buffer.from(msg));      
  };
}

测试.js

'use strict'
const connector = require('./connector');
const publisher = require('./publisher');

publisher.connect().then(() => { 
    publisher.publish('message');
    publisher.publish('message2');
    publisher.disconnect();
});

最新更新