如何对节点流应用背压

  • 本文关键字:应用 节点 node.js stream
  • 更新时间 :
  • 英文 :


在尝试使用Node.JS流进行实验时,我遇到了一个有趣的难题。当输入(可读)流推送更多数据时,目的地(可写)关心我无法正确施加背压。

我尝试的两种方法是从Writable.prototype._write返回false,并保留对Readable的引用,这样我就可以从Writable调用Readable.pause()。这两种解决方案都没有多大帮助,我将对此进行解释。

在我的练习中(你可以将完整的源代码视为Gist),我有三个流:

可读-密码生成器

util.inherits(PasscodeGenerator, stream.Readable);
function PasscodeGenerator(prefix) {
  stream.Readable.call(this, {objectMode: true});
  this.count  = 0;
  this.prefix = prefix || '';
}
PasscodeGenerator.prototype._read = function() {
  var passcode = '' + this.prefix + this.count;
  if (!this.push({passcode: passcode})) {
    this.pause();
    this.once('drain', this.resume.bind(this));
  }
  this.count++;
};

我认为this.push()的返回代码足以自我暂停并等待drain事件恢复。

转换-哈希器

util.inherits(Hasher, stream.Transform);
function Hasher(hashType) {
  stream.Transform.call(this, {objectMode: true});
  this.hashType = hashType;
}
Hasher.prototype._transform = function(sample, encoding, next) {
  var hash = crypto.createHash(this.hashType);
  hash.setEncoding('hex');
  hash.write(sample.passcode);
  hash.end();
  sample.hash = hash.read();
  this.push(sample);
  next();
};

只需将密码的散列添加到对象中。

可书写-示例消费者

util.inherits(SampleConsumer, stream.Writable);
function SampleConsumer(max) {
  stream.Writable.call(this, {objectMode: true});
  this.max   = (max != null) ? max : 10;
  this.count = 0;
}
SampleConsumer.prototype._write = function(sample, encoding, next) {
  this.count++;
  console.log('Hash %d (%s): %s', this.count, sample.passcode, sample.hash);
  if (this.count < this.max) {
    next();
  } else {
    return false;
  }
};

在这里,我想尽可能快地消耗数据,直到达到最大样本数,然后结束流。我尝试使用this.end()而不是return false,但这导致了在结束后调用的可怕的写入问题。如果样本量很小,返回false确实会停止所有操作,但当样本量很大时,我会出现内存不足错误:

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory
Aborted (core dumped)

根据这个SO答案,理论上Write流将返回false,导致流缓冲,直到缓冲区满为止(objectMode默认为16),最终Readable将调用它的this.pause()方法。但16 + 16 + 16 = 48;缓冲区中有48个物体,直到东西装满,系统堵塞。实际上更少,因为不涉及克隆,所以它们之间传递的对象是相同的引用。在高水位线停止一切之前,这难道不意味着记忆中只有16个物体吗?

最后,我意识到我可以让Writable引用Readable使用闭包来调用它的pause方法。然而,这个解决方案意味着可写流对另一个对象了解很多。我必须通过一个参考:

var foo = new PasscodeGenerator('foobar');
foo
  .pipe(new Hasher('md5'))
  .pipe(new SampleConsumer(samples, foo));

对于流的工作方式来说,这感觉是不正常的。我认为背压足以导致可写表阻止Readable推送数据并防止内存不足错误。

一个类似的例子是Unix head命令。在Node I中实现这一点时,我会假设目标可以结束,而不仅仅是忽略导致源继续推送数据,即使目标有足够的数据来满足文件的开头部分。

我如何惯用地构造自定义流,以便当目标准备好结束时,源流不会尝试推送更多数据?

这是内部调用_read()的已知问题。由于_read()总是同步/立即推送,因此内部流实现可以在适当的条件下进入循环。_read()实现通常被期望进行某种异步I/O(例如,从磁盘或网络读取)。

解决方法(如上面的链接中所述)是使_read()至少在某些时候异步。您也可以在每次调用时使其异步:

PasscodeGenerator.prototype._read = function(n) {
  var passcode = '' + this.prefix + this.count;
  var self = this;
  // `setImmediate()` delays the push until the beginning
  // of the next tick of the event loop
  setImmediate(function() {
    self.push({passcode: passcode});
  });
  this.count++;
};

相关内容

  • 没有找到相关文章

最新更新