逐行读取并推送到队列,在node.js中异步进行



我是整个异步/承诺概念的新手,并试图找出如何优化我在node.js中编写的lambda并行执行操作。

我正在从一个长文件(大约100k行)逐行读取,我想为每一行推送消息到SQS。在函数完成执行之前,我需要等待所有这些进程返回。

下面的代码工作,但它似乎在写入队列的内部承诺开始执行之前读取文件中的每一行,然后在最后,它将所有消息推送到队列,因为调用Promise.all

是否有办法并行运行这些操作?文件读取必须是顺序的,但我希望看到对队列的调用混合在那里。

exports.queueUpdates = async (filePath) => {
return new Promise((resolve, reject) => {
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity
});
var queuePromises = [];
rl.on('line', (line) => {
console.log("read line", line);
var message = exports.queueMessageForLine(line); // This function returns the message to be sent to SQS as a JSON Object.
if (message !== null) {
console.log("Pushing message", message);
queuePromises.push(
sqs.sendMessage(message).promise()
.then((result) => {
console.log("Enqueued message", message, result);
return message;
})
.catch((err) => {
console.error("Failed adding message to queue", err);
return message;
})
);
}
}).on('close', () => {
console.log("file read");
Promise.all(queuePromises).then((results) => {
resolve(results)
})
}).on('error', (err) => {
console.error(err, "Error in reading the file contents");
reject();
});
});
};

如果它正在做我想要的事情,我希望这里的输出是这样的:

read line, line 1
read line, line 2
Enqueued message, message 1
read line, line 3
Enqueued message, message 2
Enqueued message, message 3
etc.

全部混合在一起。

经过更多的调查,这实际上是我想要的方式。这个问题是James在评论中提到的——readline批量抽取多行,然后分派事件。当在本地测试一个较小的文件时,它只是没有足够的内容来超出一次读取-导致顺序执行。在将其推送到AWS并读取完整的100k+行文件后,它显然以正确的顺序执行。

最新更新