Node.js应用程序侦听消息队列并将消息异步添加到 redis



我正在开发一个节点.js组件,该组件侦听消息队列(ActiveMQ(并将收到的消息批量添加到redis中(每批必须为20个(。

当从 ActiveMQ 接收的消息数为每秒 10 条或更少时,没有问题。

我的问题是消息每 4 毫秒添加到队列中。这会导致添加到批处理中的记录数有时每个批处理超过 20 条。

const stompit = require('stompit');
var uuid = require('node-uuid');
var Redis = require('ioredis');
var redis = new Redis();
var pipeline = redis.pipeline();
var batchCounter = 0;
stompit.connect({ host: 'localhost', port: 61613 }, function(err1, client) {
client.subscribe({ destination: 'MyQueue' }, function(err2, msg) {
    msg.readString('UTF-8', function(err3, body) {
        if (batchCounter >= 20){
            pipeline.exec(function(err4, results) {
                pipeline = redis.pipeline();
                batchCounter = 0;
                console.log(results);
            });
        }
        batchCounter++;
        pipeline.set(uuid.v1(), JSON.stringify(body));
        //client.disconnect();
      });
  });
  });

如何解决这个问题?谢谢

在调用 .exec 方法之前尝试重置管道,我认为这是一种异步方法。由于.exec在将来的某个时间点运行,因此增量和pipeline.set可以在它之前运行。

以下代码保存当前管道,并在.exec之前同步创建一个新管道

if (batchCounter >= 20){
    let fullpipeline = pipeline;
    pipeline = redis.pipeline();
    batchCounter = 0; 
    fullpipeline.exec(function(err4, results) {
        console.log(err4, results);
    });
}

然后,新消息应仅追加到新管道。

我最终使用标志控制了每批的记录数。我相信还有另一种可能的,可能更有效的解决方法,即控制从ActiveMQ读取的过程。但是,在这种情况下,标志为我完成了这项工作。使用标志的完整代码位于以下链接中:

https://github.com/TamerB/ToActiveMQToRedis/blob/master/consumer/app.js

最新更新