我正在使用amqlib
模块构建一个带有rabbitmq和nodejs的后台任务管理系统。
有些任务确实会消耗 CPU,所以如果我启动很多任务并且我只有几个工作线程,我的服务器可能会被杀死(使用太多 CPU(。
我想知道是否有办法创建一个 amqp 队列,以便我的消费者一次只使用此队列中的一个任务(即在确认或拒绝之前,不要向该消费者发送此类任务(。或者我应该在代码中自己处理这个问题(也许在我的工作人员中保留一个引用,即我正在处理此队列的任务并在执行任务时拒绝此队列的所有任务?
这是我的示例代码:
我正在像那样创建 amqp 连接
const amqpConn = require('amqplib').connect('amqp://localhost');
我的队列名称是 tasks
:
amqpConn.then((conn) => {
return conn.createChannel();
}).then((ch) => {
return ch.assertQueue('tasks').then((ok) => {
ch.sendToQueue(q, new Buffer(`something to do ${i}`));
});
}).catch(console.warn);
这是我的消费者(我想这是我应该做的工作来限制这个队列的一个并发任务的地方(:
amqpConn.then((conn) => {
return conn.createChannel();
}).then((ch) => {
return ch.assertQueue('tasks').then((ok) => {
return ch.consume('tasks', (msg) => {
if (msg !== null) {
console.log(msg.content.toString());
ch.ack(msg);
}
});
});
}).catch(console.warn);
多谢!
I'm wondering if there is a way to create an amqp queue so that my consumers will only consume one task of this queue at a time
如果这是您真正需要的,那么是的,只需只有一个消费者并声明队列是独占的。这样,一次任务就会消耗掉。
我想我通过以下方式得到了它:
- 为每个队列创建通道
- 使用通道的
prefetch_count
限制每个使用者的并发性
https://www.rabbitmq.com/consumer-prefetch.html