优化RabbitMQ消费者以批量消费



我有一个应用程序,在每次消息消费时,我都需要查询MySQL数据库中的一些信息,并在此基础上处理消费的消息。我希望对此进行优化,以防止数据库上的多个查询增加负载。

我正在考虑一种方法,即至少等待x条消息y秒。通过这种方式,我可以批量消费一些消息,即使在某个时候我收到的消息更少,它们也会被消费。

示例:假设x=100y=10秒

这意味着我至少要等待100条消息或10秒,以先到者为准。通过这种方式,我可以在一个查询中一次查询数据库100条消息。此外,如果我收到的消息少于100条,剩余的消息将在最长10秒的窗口内处理。

我使用NodeJS和amqplib进行消费。我有以下基于RabbitMQ示例的代码:

amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'hello';
ch.assertQueue(q, {durable: false});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function(msg) {
console.log(" [x] Received %s", msg.content.toString());
}, {noAck: true});
});
});

我想有一个全局对象,并在每个consume回调中添加它,并在它到达被处理的x消息时检查该对象的计数。尽管如此,我不知道如何在此基础上添加y秒的时间上限,并确保如果我在时间窗口内收到的消息少于x条,则这些消息将被处理

以下代码将在每个收到的消息后调用一个函数,该函数将收到的消息聚合到一个数组中。当在没有消息的情况下调用它(参数为null(时,或者当它发现消息计数已达到x时,它会将聚合消息发送到数据库函数。否则,它只需将消息添加到数组中(在if语句的第二部分中(。

参数null由计时器传递给聚合函数,计时器在y秒后启动。此计时器在消息队列刚刚初始化时首次设置,并且在聚合器向数据库发送消息时重置。

var messageStore = [];
var timer;
sendToDatabase = function(messages) {...}
aggregate = function(msg) {
if (msg == null || messageStore.push(msg) == x) {
clearTimeout(timer);
timer = setTimeout(aggregate, 1000*y, null);
sendToDatabase(messageStore);
messageStore = [];
}
}
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'hello';
ch.assertQueue(q, {durable: false});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
timer = setTimeout(aggregate, 1000*y, null);
ch.consume(q, function(msg) {
console.log(" [x] Received %s", msg.content.toString());
aggregate(msg);
}, {noAck: true});
});
});

注意:我还没能测试这个,因为我手头没有消息系统。

相关内容

  • 没有找到相关文章

最新更新