nodejs等待,直到循环中的所有MongoDB调用完成



我正在逐行从CSV文件中读取数据流,并在每行上调用findOne MongoDB调用,如何等到每行的所有mongo调用完成后再运行下一个函数?

我见过Promises能做到吗?但我发现"承诺"极难理解。我发现的例子似乎都没有涵盖我正在尝试的内容

var validProducts = [];
fs.createReadStream(req.file.path)
  .pipe(csvStream)
  .on('error', function (err) {
    console.error(err);
  })
  // loop through all rows
  .on('data', function (data) {
    if (data.size === 'a3') {
      ProductModel.findOne({ sku: data.sku }, function (err, product) {
        if (product !== null) {
          product.size = data.size;
          product.save();
          validProducts.push(product);
        }
      });
    }
  });
// on finish make call to other function
socket.emit({ 'status': 'complete' });
otherFunction(validProducts);

on('finish')on('end')将只在数据流的末尾调用,而不是在Monogo调用之后调用。

如果我可以使用承诺,有人能解释一下如何使用吗?

您可以使用Q API开关来做出承诺。有一个有趣的函数允许您等待一系列承诺得到解决。以下是如何使用Q.all:解决问题的示例

var validProducts = [];
var promises = [];
function handleData(data) {
    if (data.size === 'a3') {
        var deferred = Q.defer();
        ProductModel.findOne({ sku: data.sku }, function (err, product) {
            if (err) {
                deferred.reject(new Error(err));
            }
            if (product) {
                product.size = data.size;
                product.save();
                deferred.resolve(product);
                validProducts.push(product);
            }
        });
        promises.push(deferred.promise);
    }
}
function handleEnd() {
    Q.all(promises).done(function (values) {
        socket.emit({ 'status': 'complete' });
        otherFunction(validProducts);
    });
}
fs.createReadStream(req.file.path)
  .on('data', handleData)
  .on('end', handleEnd);

使用暂停/恢复

  .on('data', function (data) {
    if (data.size === 'a3') {
      this.pause(); // pause it
      var stream = this; // save 'this'
      ProductModel.findOne({ sku: data.sku }, function (err, product) {
        if (product !== null) {
          product.size = data.size;
          product.save();
          validProducts.push(product);
        }
        stream.resume(); //resume it
      });
    }
  });

最新更新