在尝试使用Node.JS流进行实验时,我遇到了一个有趣的难题。当输入(可读)流推送更多数据时,目的地(可写)关心我无法正确施加背压。
我尝试的两种方法是从Writable.prototype._write
返回false,并保留对Readable的引用,这样我就可以从Writable调用Readable.pause()
。这两种解决方案都没有多大帮助,我将对此进行解释。
在我的练习中(你可以将完整的源代码视为Gist),我有三个流:
可读-密码生成器
util.inherits(PasscodeGenerator, stream.Readable);
function PasscodeGenerator(prefix) {
stream.Readable.call(this, {objectMode: true});
this.count = 0;
this.prefix = prefix || '';
}
PasscodeGenerator.prototype._read = function() {
var passcode = '' + this.prefix + this.count;
if (!this.push({passcode: passcode})) {
this.pause();
this.once('drain', this.resume.bind(this));
}
this.count++;
};
我认为this.push()
的返回代码足以自我暂停并等待drain
事件恢复。
转换-哈希器
util.inherits(Hasher, stream.Transform);
function Hasher(hashType) {
stream.Transform.call(this, {objectMode: true});
this.hashType = hashType;
}
Hasher.prototype._transform = function(sample, encoding, next) {
var hash = crypto.createHash(this.hashType);
hash.setEncoding('hex');
hash.write(sample.passcode);
hash.end();
sample.hash = hash.read();
this.push(sample);
next();
};
只需将密码的散列添加到对象中。
可书写-示例消费者
util.inherits(SampleConsumer, stream.Writable);
function SampleConsumer(max) {
stream.Writable.call(this, {objectMode: true});
this.max = (max != null) ? max : 10;
this.count = 0;
}
SampleConsumer.prototype._write = function(sample, encoding, next) {
this.count++;
console.log('Hash %d (%s): %s', this.count, sample.passcode, sample.hash);
if (this.count < this.max) {
next();
} else {
return false;
}
};
在这里,我想尽可能快地消耗数据,直到达到最大样本数,然后结束流。我尝试使用this.end()
而不是return false
,但这导致了在结束后调用的可怕的写入问题。如果样本量很小,返回false确实会停止所有操作,但当样本量很大时,我会出现内存不足错误:
FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory
Aborted (core dumped)
根据这个SO答案,理论上Write流将返回false,导致流缓冲,直到缓冲区满为止(objectMode
默认为16),最终Readable将调用它的this.pause()
方法。但16 + 16 + 16 = 48
;缓冲区中有48个物体,直到东西装满,系统堵塞。实际上更少,因为不涉及克隆,所以它们之间传递的对象是相同的引用。在高水位线停止一切之前,这难道不意味着记忆中只有16个物体吗?
最后,我意识到我可以让Writable引用Readable使用闭包来调用它的pause方法。然而,这个解决方案意味着可写流对另一个对象了解很多。我必须通过一个参考:
var foo = new PasscodeGenerator('foobar');
foo
.pipe(new Hasher('md5'))
.pipe(new SampleConsumer(samples, foo));
对于流的工作方式来说,这感觉是不正常的。我认为背压足以导致可写表阻止Readable推送数据并防止内存不足错误。
一个类似的例子是Unix head
命令。在Node I中实现这一点时,我会假设目标可以结束,而不仅仅是忽略导致源继续推送数据,即使目标有足够的数据来满足文件的开头部分。
我如何惯用地构造自定义流,以便当目标准备好结束时,源流不会尝试推送更多数据?
这是内部调用_read()
的已知问题。由于_read()
总是同步/立即推送,因此内部流实现可以在适当的条件下进入循环。_read()
实现通常被期望进行某种异步I/O(例如,从磁盘或网络读取)。
解决方法(如上面的链接中所述)是使_read()
至少在某些时候异步。您也可以在每次调用时使其异步:
PasscodeGenerator.prototype._read = function(n) {
var passcode = '' + this.prefix + this.count;
var self = this;
// `setImmediate()` delays the push until the beginning
// of the next tick of the event loop
setImmediate(function() {
self.push({passcode: passcode});
});
this.count++;
};