插入流式处理的 XML 数据数据库



我正在尝试有效地插入大量数据(XML文件的大小超过70GB(,而不会使我的MongoDB服务器崩溃。目前,这是我在 NodeJS 中使用xml-stream所做的:

var fs = require('fs'),
path = require('path'),
XmlStream = require('xml-stream'),
MongoClient = require('mongodb').MongoClient,
assert = require('assert'),
ObjectId = require('mongodb').ObjectID,
url = 'mongodb://username:password@my.server:27017/mydatabase',
amount = 0;
var stream = fs.createReadStream(path.join(__dirname, 'motor.xml'));
var xml = new XmlStream(stream);
xml.collect('ns:Statistik');
xml.on('endElement: ns:Statistik', function(item) {
var insertDocument = function(db, callback) {
db.collection('vehicles').insertOne(item, function(err, result) {
amount++;
if (amount % 1000 == 0) {
console.log("Inserted", amount);
}
callback();
});
};
MongoClient.connect(url, function(err, db) {
insertDocument(db, function() {
db.close();
});
});
});

当我调用xml.on()时,它基本上返回我当前所在的树/元素。由于这是直接的 JSON,我可以将其作为参数提供给我的db.collection().insertOne()函数,它会完全按照我想要的方式将其插入到数据库中。

所有代码实际上都像现在一样工作,但它在大约 3000 次插入(大约需要 10 秒(后停止。我怀疑这是因为我打开数据库连接,插入数据,然后每次在XML文件中看到树时关闭连接,在本例中约为3000次。

我可以以某种方式合并insertMany()函数并以 100 秒(或更多(的块为单位进行,但我不太确定如何在流式传输和异步

的情况下工作。所以我的问题是:如何将大量 XML(到 JSON(插入我的 MongoDB 数据库而不会崩溃?

你认为.insertMany()比每次都写更好,所以这实际上只是在"流"上收集数据的问题。

由于执行是"异步"的,因此您通常希望避免堆栈上有太多活动调用,因此通常在调用.insertMany()之前.pause()"流",然后在回调完成后.resume()

var fs = require('fs'),
path = require('path'),
XmlStream = require('xml-stream'),
MongoClient = require('mongodb').MongoClient,
url = 'mongodb://username:password@my.server:27017/mydatabase',
amount = 0;
MongoClient.connect(url, function(err, db) {
var stream = fs.createReadStream(path.join(__dirname, 'motor.xml'));
var xml = new XmlStream(stream);
var docs = [];
//xml.collect('ns:Statistik');
// This is your event for the element matches
xml.on('endElement: ns:Statistik', function(item) {
docs.push(item);           // collect to array for insertMany
amount++;
if ( amount % 1000 === 0 ) { 
xml.pause();             // pause the stream events
db.collection('vehicles').insertMany(docs, function(err, result) {
if (err) throw err;
docs = [];             // clear the array
xml.resume();          // resume the stream events
});
}
});
// End stream handler - insert remaining and close connection
xml.on("end",function() {
if ( amount % 1000 !== 0 ) {
db.collection('vehicles').insertMany(docs, function(err, result) {
if (err) throw err;
db.close();
});
} else {
db.close();
}
});
});

甚至对其进行了一些现代化改造:

const fs = require('fs'),
path = require('path'),
XmlStream = require('xml-stream'),
MongoClient = require('mongodb').MongoClient;
const uri = 'mongodb://username:password@my.server:27017/mydatabase';
(async function() {
let amount = 0,
docs = [],
db;
try {
db = await MongoClient.connect(uri);
const stream = fs.createReadStream(path.join(__dirname, 'motor.xml')),
xml = new XmlStream(stream);
await Promise((resolve,reject) => {
xml.on('endElement: ns:Statistik', async (item) => {
docs.push(item);
amount++;
if ( amount % 1000 === 0 ) {
try {
xml.pause();
await db.collection('vehicle').insertMany(docs);
docs = [];
xml.resume();
} catch(e) {
reject(e)
}
}
});
xml.on('end',resolve);
xml.on('error',reject);
});
if ( amount % 1000 !== 0 ) {
await db.collection('vehicle').insertMany(docs);
}
} catch(e) {
console.error(e);
} finally {
db.close();
}
})();

请注意,MongoClient连接实际上包装了所有其他操作。您只想连接一次,其他操作发生在"流"的事件处理程序上。

因此,对于您的XMLStream,事件处理程序是在表达式匹配以及提取和收集到数组中的数据时触发的。每 1000 个项目,.insertMany()调用就会插入文档,在"异步"调用上"暂停"和"恢复"。

完成后,将在"流"上触发"结束"事件。这是您关闭数据库连接的地方,事件循环将被释放并结束程序。

虽然可以通过允许各种.insertMany()调用同时发生(并且通常为"池大小",以免超出调用堆栈(来获得一定程度的"并行性"(,但这基本上是最简单的过程外观,只需在等待其他异步 I/O 完成时暂停即可。

注意:根据后续问题从原始代码中注释掉.collect()方法,这似乎没有必要,实际上保留了内存中的节点,这些节点在每次写入数据库后确实应该丢弃。

最新更新