我一直在尝试使用可读和转换流来处理一个非常大的文件。我似乎遇到的问题是,如果我不把可写流放在最后,程序似乎在返回结果之前就终止了。
示例:rstream.pipe(split()).pipe(tstream)
我的tstream
有一个发射器,当计数器达到阈值时发射。当阈值设置为一个较低的值时,我得到一个结果,但当它设置为高时,它不返回任何东西。如果我把它管道到一个文件写入器,它总是返回一个结果。我错过了什么明显的东西吗?
// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');
var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {
// THIS WORKS if i have a low counter for qtransformstream,
// but when it's high, I do not get a result
// rstream.pipe(split()).pipe(qtransformstream);
// this always works
rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);
};
以下是Qtransformstream
的代码// Dependencies
var Transform = require('stream').Transform,
util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
// Create this as a Transform Stream
Transform.call(this, {
objectMode: true
});
// Default the Qbase to 32 as an assumption
this.Qbase = 32;
if (Quser) {
this.Quser = Quser;
} else {
this.Quser = 20;
}
this.Qpass = this.Quser + this.Qbase;
this.Counter = 0;
// Variables used as intermediates
this.Qmin = 120;
this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
var Qmin = this.Qmin;
var Qmax = this.Qmax;
var Qbase = this.Qbase;
var Quser = this.Quser;
this.Counter++;
// Stop the stream after 100 reads and emit the data
if (this.Counter === 100) {
this.emit('completed', this.Qbase, this.Quser);
}
// do some calcs on this.Qbase
this.push('something not important');
callback();
};
// export the object
module.exports = TransformStream;
EDIT:
另外,我不知道你的计数器有多高,但是如果你填满缓冲区,它将停止向转换流传递数据,在这种情况下completed
从未真正命中,因为你从未达到计数器限制。试着改变你的highwatermark
.
EDIT 2: A Little Better Explanation
众所周知,transform stream
是一个双工流,这基本上意味着它可以从源接收数据,并且可以将数据发送到目的地。这通常分别被称为读和写。transform stream
继承自Node.js实现的read stream
和write stream
。不过有一点需要注意,transform stream
不必实现_read或_write函数。在这个意义上,你可以把它看作是鲜为人知的直通流。
如果您考虑transform stream
实现write stream
的事实,您还必须考虑写流总是有一个转储其内容的目的地的事实。问题,你有是,当你创建一个transform stream
你不能指定一个地方发送你的内容。将数据完全通过转换流传递的唯一方法是将其管道传输到写流,否则,实际上您的流被备份并且不能接受更多的数据,因为没有数据的位置。
这就是为什么当你管道到写流时它总是工作的原因。写流通过将数据发送到目的地来减轻数据备份,因此所有数据都将通过管道传输,并且将触发完成事件。
当样本大小很小的时候,你的代码在没有写流的情况下工作的原因是你没有填充你的流,所以转换流可以接受足够的数据来允许完整的事件/阈值被击中。随着阈值的增加,流可以接受的数据量保持不变,而无需将其发送到另一个地方(写流)。这将导致您的流被备份,并且它不能再接受数据,这意味着完成的事件将永远不会被发出。
我冒昧地说,如果您增加转换流的highwatermark
,您将能够增加阈值,并且仍然使代码工作。这种方法是不正确的。将你的流管道到一个写流,这个写流将把数据发送到dev/null。创建这个写流的方法是:
var writer = fs.createWriteStream('/dev/null');
Node.js文档中关于缓冲的部分解释了你遇到的错误
你不中断_transform和进程远。试一试:
this.emit('completed', ...);
this.end();
这就是为什么程序似乎在返回结果之前就终止了
并且不要输出任何无用的数据:
var wstream = fs.createWriteStream('/dev/null');
好运)
我建议使用Writable流而不是Transform流。然后将_transform
重命名为_write
,如果您管道到它,您的代码将使用该流。一个转换流,正如@Bradgnar已经指出的,需要一个消费者,否则它会阻止可读流将更多的数据推送到它的缓冲区。