在快速流读取流中平衡慢速 I/O



在节点中.js我有一个读取流,我希望重新格式化并写入数据库。由于读取流速度快,写入速度慢,因此节点.js队列可能会随着写入队列的建立而不堪重负(假设流是 GB 的数据)。如何强制读取等待代码的写入部分,以便在不阻塞的情况下不会发生这种情况?

var request = http.get({
      host: 'api.geonames.org',
      port: 80,
      path: '/children?' + qs.stringify({
      geonameId: geonameId,
      username: "demo"
   })
}).on('response', function(response) {
   response.setEncoding('utf8');
   var xml = new XmlStream(response, 'utf8');
   xml.on('endElement: geoname ', function(input) {  
      console.log('geoname');
      var output = new Object();
      output.Name = input.name;
      output.lat = input.lat;
      output.lng = input.lng;
      output._key = input.geonameId;
      data.db.document.create(output, data.doc, function(callback){    
         //this is really slow.
      }
      // i do not want to return from here and receive more data until the 'create' above has completed
   });  
});

我昨晚刚刚遇到了这个问题,在我的黑客马拉松诱导睡眠不足的状态下,这是我解决它的方法:

每当我发送作业进行处理时,

我都会递增一个计数器,并在操作完成时递减计数器。为了防止出站流量压倒其他服务,我会在有一定数量的待处理出站请求时暂停流。代码与以下内容非常相似。

var instream = fs.createReadStream('./combined.csv');
var outstream = new stream;
var inProcess = 0;
var paused = false;
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
    inProcess++;
    if(inProcess > 100) {
        console.log('pausing input to clear queue');
        rl.pause();
        paused = true;
    }
    someService.doSomethingSlow(line, function() {
        inProcess--;
        if(paused && inProcess < 10) {
            console.log('resuming stream');
            paused = false;
            rl.resume();
        }
        if (err) throw err;
    });
});
rl.on('end', function() {
    rl.close();
});

不是最优雅的解决方案,但它有效并允许我处理百万+行,而不会耗尽内存或限制其他服务。

我的解决方案只是扩展了一个空stream.Writable,并且与@Timothy的解决方案基本相同,但使用事件和不依赖于 Streams1 .pause().resume()(这似乎对我的数据管道没有任何影响,反正)。

var stream = require("stream");
var liveRequests = 0;
var maxLiveRequests = 100;
var streamPaused = false;
var requestClient = new stream.Writable();
function requestCompleted(){
    liveRequests--;
    if(streamPaused && liveRequests < maxLiveRequests){
        streamPaused = false;
        requestClient.emit("resumeStream");
    }
}
requestClient._write = function (data, enc, next){
    makeRequest(data, requestCompleted);
    liveRequests++;
    if(liveRequests >= maxLiveRequests){
        streamPaused = true;
        requestClient.once("resumeStream", function resume(){
            next();
        });
    }
    else {
        next();
    }
};

计数器 liveRequests 跟踪并发请求数,并在每当 makeRequest()被调用,并在完成时(即,当调用requestCompleted()时)递减。如果请求具有刚刚制作并且liveRequests超过maxLiveRequests,我们暂停流与streamPaused.如果请求完成,流暂停,liveRequests现在小于 maxLiveRequests ,我们可以恢复流。因为后续数据项由_write()在调用其next()回调时读取,我们可以简单地推迟后者自定义 "resumeStream" 事件上的事件侦听器,模拟暂停/恢复。现在,只需readStream.pipe(requestClient).


编辑:我将这个解决方案以及输入数据的自动批处理抽象到一个包中。

相关内容

  • 没有找到相关文章

最新更新