在节点中.js我有一个读取流,我希望重新格式化并写入数据库。由于读取流速度快,写入速度慢,因此节点.js队列可能会随着写入队列的建立而不堪重负(假设流是 GB 的数据)。如何强制读取等待代码的写入部分,以便在不阻塞的情况下不会发生这种情况?
var request = http.get({
host: 'api.geonames.org',
port: 80,
path: '/children?' + qs.stringify({
geonameId: geonameId,
username: "demo"
})
}).on('response', function(response) {
response.setEncoding('utf8');
var xml = new XmlStream(response, 'utf8');
xml.on('endElement: geoname ', function(input) {
console.log('geoname');
var output = new Object();
output.Name = input.name;
output.lat = input.lat;
output.lng = input.lng;
output._key = input.geonameId;
data.db.document.create(output, data.doc, function(callback){
//this is really slow.
}
// i do not want to return from here and receive more data until the 'create' above has completed
});
});
我昨晚刚刚遇到了这个问题,在我的黑客马拉松诱导睡眠不足的状态下,这是我解决它的方法:
每当我发送作业进行处理时,我都会递增一个计数器,并在操作完成时递减计数器。为了防止出站流量压倒其他服务,我会在有一定数量的待处理出站请求时暂停流。代码与以下内容非常相似。
var instream = fs.createReadStream('./combined.csv');
var outstream = new stream;
var inProcess = 0;
var paused = false;
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
inProcess++;
if(inProcess > 100) {
console.log('pausing input to clear queue');
rl.pause();
paused = true;
}
someService.doSomethingSlow(line, function() {
inProcess--;
if(paused && inProcess < 10) {
console.log('resuming stream');
paused = false;
rl.resume();
}
if (err) throw err;
});
});
rl.on('end', function() {
rl.close();
});
不是最优雅的解决方案,但它有效并允许我处理百万+行,而不会耗尽内存或限制其他服务。
我的解决方案只是扩展了一个空stream.Writable
,并且与@Timothy的解决方案基本相同,但使用事件和不依赖于 Streams1 .pause()
和.resume()
(这似乎对我的数据管道没有任何影响,反正)。
var stream = require("stream");
var liveRequests = 0;
var maxLiveRequests = 100;
var streamPaused = false;
var requestClient = new stream.Writable();
function requestCompleted(){
liveRequests--;
if(streamPaused && liveRequests < maxLiveRequests){
streamPaused = false;
requestClient.emit("resumeStream");
}
}
requestClient._write = function (data, enc, next){
makeRequest(data, requestCompleted);
liveRequests++;
if(liveRequests >= maxLiveRequests){
streamPaused = true;
requestClient.once("resumeStream", function resume(){
next();
});
}
else {
next();
}
};
计数器 liveRequests
跟踪并发请求数,并在每当 makeRequest()
被调用,并在完成时(即,当调用requestCompleted()
时)递减。如果请求具有刚刚制作并且liveRequests
超过maxLiveRequests
,我们暂停流与streamPaused
.如果请求完成,流暂停,liveRequests
现在小于 maxLiveRequests
,我们可以恢复流。因为后续数据项由_write()
在调用其next()
回调时读取,我们可以简单地推迟后者自定义 "resumeStream"
事件上的事件侦听器,模拟暂停/恢复。现在,只需readStream.pipe(requestClient)
.
编辑:我将这个解决方案以及输入数据的自动批处理抽象到一个包中。