nodejs-从可读流中查看数据事件,而无需从可读的流中暂停



我们在生产中看到了一些非常高的内存使用情况。这些文件存储在S3中,我们在S3对象上打开一个可读的流,然后我们将数据将数据送到本地文件系统上的文件(在EC2实例上)。我们的一些客户的文件非常大。在一个实例中,他们的文件大小超过6GB,并且正在处理此文件的节点过程使用了太多的内存,以至于我们几乎用尽了所有交换空间,机器却放慢了爬网。显然,我想追踪的地方有一些内存泄漏。

与此同时,当我们从流中看到某些事件时,我将代码稍微登录。我在下面有代码,并使用小型测试文件从日志中输出一些示例输出。让我感到困惑的是,可读的流接收暂停事件,然后继续发出数据和暂停事件,而没有可写的 可写的流散发排水事件。我在这里完全错过了什么吗?一旦暂停可读的流,如何在收到排水管之前继续发射数据事件?Writable Stream尚未表示已准备就绪,因此可读的流不应发送任何内容...对吗?

但要看输出。对我来说,前3个事件很有意义:数据,暂停,排水。然后接下来的3个很好:数据,数据,暂停。但是随后,它发出了另一个数据和另一个暂停事件,然后最终成为第9个事件。我不明白为什么发生事件7和8,因为直到第9次事件才出现排水量。然后在第9次事件发生后再次有一堆数据/暂停对,而没有任何相应的排水。为什么?我期望的是一些数据事件,然后是暂停,然后 nothing 直到发生排水事件 - 在这一点上可能会再次发生数据事件。在我看来,一旦发生暂停,直到排水事件发生射击之前,根本不应发生任何数据事件。也许我仍然从根本上误解了有关节点流的一些东西?

更新:文档没有提及可读流散发的暂停事件的任何内容,但他们确实提到了暂停功能可用。据推测,当可写的流返回false时,这将被调用,我认为暂停功能会发出暂停事件。无论如何,如果调用了暂停(),文档似乎对我的世界看来。请参阅https://nodejs.org/docs/v0.10.30/api/stream.html#stream_class_class_stream_stream_readable

此方法将导致流模式中的流到停止发射数据事件。任何可用的数据都将保留在内部 缓冲区。

此测试是在我的开发机上运行的(Ubuntu 14.04,带有节点V0.10.37)。我们在产品中的EC2实例几乎是相同的。我认为他们现在正在运行v0.10.30。

S3Service.prototype.getFile = function(bucket, key, fileName) {
  var deferred = Q.defer(),
    self = this,
    s3 = self.newS3(),
    fstream = fs.createWriteStream(fileName),
    shortname = _.last(fileName.split('/'));
  logger.debug('Get file from S3 at [%s] and write to [%s]', key, fileName);
  // create a readable stream that will retrieve the file from S3
  var request = s3.getObject({
    Bucket: bucket,
    Key: key
  }).createReadStream();
  // if network request errors out then we need to reject
  request.on('error', function(err) {
      logger.error(err, 'Error encountered on S3 network request');
      deferred.reject(err);
    })
    .on('data', function() {
      logger.info('data event from readable stream for [%s]', shortname);
    })
    .on('pause', function() {
      logger.info('pause event from readable stream for [%s]', shortname);
    });
  // resolve when our writable stream closes, or reject if we get some error
  fstream.on('close', function() {
      logger.info('close event from writable stream for [%s] -- done writing file', shortname);
      deferred.resolve();
    })
    .on('error', function(err) {
      logger.error(err, 'Error encountered writing stream to [%s]', fileName);
      deferred.reject(err);
    })
    .on('drain', function() {
      logger.info('drain event from writable stream for [%s]', shortname);
    });
  // pipe the S3 request stream into a writable file stream
  request.pipe(fstream);
  return deferred.promise;
};

[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.507Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.514Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.595Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.598Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.601Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.603Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.688Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.691Z] INFO: worker/7525 on bdmlinux: close event from writable stream for [FeedItem.csv] -- done writing file

您可能有一些类似量子的"观察现象改变了结果"的情况。节点在v0.10中引入了一种新的流方式。从文档中:

如果您附加了数据事件侦听器,则将其切换到流动模式,并且一旦可用,数据将传递给您的处理程序。

也就是说,附加数据侦听器会将流还原为经典流模式。这可能就是为什么您获得的行为与您在其余文档中阅读的内容不一致的行为。为了毫不侵害地观察事物,您可以尝试删除on('data')并在使用through之间插入自己的流:

var through = require('through');
var observer = through(function write(data) {
    console.log('Data!');
    this.queue(data);
}, function end() {
    this.queue(null);
});
request.pipe(observer).pipe(fstream);

相关内容

  • 没有找到相关文章

最新更新