我对rabbitmq。
我想知道当发送";pub";消息,我需要关闭与通道的连接和amqpblib连接,与消费者相同?或者正确的做法是保持连接打开?
我有这个酒吧和酒吧:
public produce = async <T>(queue: string, message: T): Promise<boolean> => {
try {
if (!this.connection) await this.start();
await this.initChannel(queue);
const sendResult = this.channel.sendToQueue(queue, Buffer.from(message), {
persistent: true,
});
if (!sendResult) {
await new Promise(resolve => this.channel.once('drain', () => resolve));
}
return sendResult;
} catch (error) {
Logger.info(error.message);
return false;
} finally {
this.close();
}
};
订阅
public subscribe = async (
queue: string,
onMessage: (msg: IMessage) => boolean,
): Promise<void> => {
if (!this.connection) await this.start();
const channel = await this.initChannel(queue);
channel.consume(queue, message => {
if (!message) return false;
const body = <IMessage>JSON.parse(message.content.toString());
if (body && onMessage(body)) onMessage(body);
channel.ack(message);
});
};
这个到初始化连接和listenner事件:
private start = async () => {
try {
this.connection = await connect(this.rabbitUrl);
Logger.info('connect to RabbitMQ success');
await this.listeners();
} catch (err) {
Logger.info(err.message);
sleep(this.start, 10000);
}
};
private listeners = async (): Promise<Connection> => {
return (
this.connection.on('error', (err: Error) => {
Logger.info(err.message);
sleep(this.start, 10000);
}) &&
this.connection.on('close', () => {
Logger.info('connection to RabbitQM closed!');
sleep(this.start, 10000);
})
);
};
在大多数客户端中,创建连接成本很高,通道很轻,但线程不安全。
对于Node.js应用程序(单线程模型(,理想情况下publish
和subscribe
只有两个连接,每个连接一个通道。
所以请保持连接和通道畅通。