无法使用 Node 将大块数据填充到 mongodb.js



我被要求导入从全市许多站点收集的大量天气数据。每个站点有一台计算机,有一个文件夹,每5分钟同步到中央服务器。每天都会创建一个新文件。基本结构是这样的。一个txt文件格式为csv文件,第一行为字段,其余为数字。

folder_on_server
|__ site1 __ date1.txt
| |__ date .txt
|
|__ site2 __ date1.txt
| __ date2.txt
我写了一个小的node.js应用程序来填充这些数据到mongoDB。然而,目前,我们只有3个站点,但每个站点有近900个txt文件,每个文件包含24*20 = 288行(每5分钟记录一次数据)。我试着运行node应用程序,但是在读取了第一个文件夹的大约100个文件后,程序崩溃了,出现了内存分配失败的错误。

我已经尝试了很多方法来改善这一点:

  1. 将nodejs的内存大小增加到8GB =>好一点,更多的文件读取,但仍然无法移动到下一个文件夹
  2. 在_的末尾设置一些变量为null和未定义。forEach循环(我使用下划线)=>没有帮助。
  3. 移动文件数组(使用fs.readdir),这样第一个元素将被删除=>也没有帮助。

是否有任何方法强制js每次完成读取文件时清理内存?由于

更新1:我最终在每个文件夹中一次添加了100个文件。这似乎很乏味,但它有效,这就像一次工作。然而,我仍然想找到一个解决这个问题的方法。

就像Robbie说的,流是解决这个问题的方法。应该用fs.createReadStream()代替.readFileSync()。我将首先创建一个行读取器,它接受路径和您想要分割的任何字符串/正则表达式:

linereader.js

var fs = require("fs");
var util = require("util");
var EventEmitter = require("events").EventEmitter;
function LineReader(path, splitOn) {
    var readStream = fs.createReadStream(path);
    var self = this;
    var lineNum = 0;
    var buff = ""
    var chunk;
    readStream.on("readable", function() {
        while( (chunk = readStream.read(100)) !== null) {
            buff += chunk.toString();
            var lines = buff.split(splitOn);
            for (var i = 0; i < lines.length - 1; i++) {
                self.emit("line",lines[i]);
                lineNum += 1;
            }
            buff = lines[lines.length - 1];
        }
    });
    readStream.on("close", function() {
        self.emit("line", buff);
        self.emit("close")
    });
    readStream.on("error", function(err) {
        self.emit("error", err);
    })
}
util.inherits(LineReader, EventEmitter);
module.exports = LineReader;

这将读取一个文本文件,并为每读取一行发出"line"事件,因此您不会一次将它们全部存储在内存中。然后,使用async包(或者您想使用的任何async循环),遍历插入每个文档的文件:

app.js

var LineReader = require("./linereader.js");
var async = require("async");
var paths = ["./text1.txt", "./text2.txt", "./path1/text3.txt"];
var reader;
async.eachSeries(paths, function(path, callback) {
    reader = new LineReader(path, /n/g);
    reader.on("line", function(line) {
        var doc = turnTextIntoObject(line);
        db.collection("mycollection").insert(doc);
    })
    reader.on("close", callback);
    reader.on("error", callback);
}, function(err) {
    // handle error and finish;
})

尝试使用流而不是将每个文件加载到内存中。

我已经给你发送了一个使用流和异步I/o实现的拉请求。

这是它的大部分:

var Async = require('async');
var Csv = require('csv-streamify');
var Es = require('event-stream');
var Fs = require('fs');
var Mapping = require('./folder2siteRef.json');
var MongoClient = require('mongodb').MongoClient;
var sourcePath = '/hnet/incoming/' + new Date().getFullYear();
Async.auto({
  db: function (callback) {
    console.log('opening db connection');
    MongoClient.connect('mongodb://localhost:27017/test3', callback);
  },
  subDirectory: function (callback) {
    // read the list of subfolder, which are sites
    Fs.readdir(sourcePath, callback);
  },
  loadData: ['db', 'subDirectory', function (callback, results) {
    Async.each(results.subDirectory, load(results.db), callback);
  }],
  cleanUp: ['db', 'loadData', function (callback, results) {
    console.log('closing db connection');
    results.db.close(callback);
  }]
}, function (err) {
  console.log(err || 'Done');
});
var load = function (db) {
  return function (directory, callback) {
    var basePath = sourcePath + '/' + directory;
    Async.waterfall([
      function (callback) {
        Fs.readdir(basePath, callback); // array of files in a directory
      },
      function (files, callback) {
        console.log('loading ' + files.length + ' files from ' + directory);
        Async.each(files, function (file, callback) {
          Fs.createReadStream(basePath + '/' + file)
            .pipe(Csv({objectMode: true, columns: true}))
            .pipe(transform(directory))
            .pipe(batch(200))
            .pipe(insert(db).on('end', callback));
        }, callback);
      }
    ], callback);
  };
};
var transform = function (directory) {
  return Es.map(function (data, callback) {
    data.siteRef = Mapping[directory];
    data.epoch = parseInt((data.TheTime - 25569) * 86400) + 6 * 3600;
    callback(null, data);
  });
};
var insert = function (db) {
  return Es.map(
    function (data, callback) {
      if (data.length) {
        var bulk = db.collection('hnet').initializeUnorderedBulkOp();
        data.forEach(function (doc) {
          bulk.insert(doc);
        });
        bulk.execute(callback);
      } else {
        callback();
      }
    }
  );
};
var batch = function (batchSize) {
  batchSize = batchSize || 1000;
  var batch = [];
  return Es.through(
    function write (data) {
      batch.push(data);
      if (batch.length === batchSize) {
        this.emit('data', batch);
        batch = [];
      }
    },
    function end () {
      if (batch.length) {
        this.emit('data', batch);
        batch = [];
      }
      this.emit('end');
    }
  );
};

我已经使用流更新了你的tomongo.js脚本。我还更改了它的文件I/o使用异步而不是同步。

我用小数据集测试了在你的代码中定义的结构,它工作得很好。我针对3xdir、900xfiles和288xlines做了一些有限的测试。我不确定每一行数据有多大,所以我加入了一些随机属性。它非常快。看看它是如何处理你的数据的。如果它导致问题,您可以尝试在执行批量插入操作时使用不同的写关注点来限制它。

关于node.js中流的更多信息,请查看以下链接:

http://nodestreams.com -一个由John Resig编写的工具,包含许多流示例。

事件流是一个非常有用的流模块。

最新更新