Node.js流可读转换



我一直在尝试使用可读和转换流来处理一个非常大的文件。我似乎遇到的问题是,如果我不把可写流放在最后,程序似乎在返回结果之前就终止了。

示例:rstream.pipe(split()).pipe(tstream)

我的tstream有一个发射器,当计数器达到阈值时发射。当阈值设置为一个较低的值时,我得到一个结果,但当它设置为高时,它不返回任何东西。如果我把它管道到一个文件写入器,它总是返回一个结果。我错过了什么明显的东西吗?

代码:

// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');
var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
    console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {
    // THIS WORKS if i have a low counter for qtransformstream, 
    // but when it's high, I do not get a result
    //   rstream.pipe(split()).pipe(qtransformstream);
    // this always works
    rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);
};
以下是Qtransformstream 的代码
// Dependencies
var Transform = require('stream').Transform,
    util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
    // Create this as a Transform Stream
    Transform.call(this, {
        objectMode: true
    });
    // Default the Qbase to 32 as an assumption
    this.Qbase = 32;
    if (Quser) {
        this.Quser = Quser;
    } else {
        this.Quser = 20;
    }
    this.Qpass = this.Quser + this.Qbase;
    this.Counter = 0;
    // Variables used as intermediates
    this.Qmin = 120;
    this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
    var Qmin = this.Qmin;
    var Qmax = this.Qmax;
    var Qbase = this.Qbase;
    var Quser = this.Quser;
    this.Counter++;
    // Stop the stream after 100 reads and emit the data
    if (this.Counter === 100) {
        this.emit('completed', this.Qbase, this.Quser);
    }
    // do some calcs on this.Qbase
    this.push('something not important');
    callback();
};
// export the object
module.exports = TransformStream;

EDIT:

另外,我不知道你的计数器有多高,但是如果你填满缓冲区,它将停止向转换流传递数据,在这种情况下completed从未真正命中,因为你从未达到计数器限制。试着改变你的highwatermark .

EDIT 2: A Little Better Explanation

众所周知,transform stream 是一个双工流,这基本上意味着它可以从源接收数据,并且可以将数据发送到目的地。这通常分别被称为读和写。transform stream继承自Node.js实现的read streamwrite stream。不过有一点需要注意,transform stream 不必实现_read或_write函数。在这个意义上,你可以把它看作是鲜为人知的直通流。

如果您考虑transform stream实现write stream的事实,您还必须考虑写流总是有一个转储其内容的目的地的事实。问题,你有是,当你创建一个transform stream你不能指定一个地方发送你的内容。将数据完全通过转换流传递的唯一方法是将其管道传输到写流,否则,实际上您的流被备份并且不能接受更多的数据,因为没有数据的位置。

这就是为什么当你管道到写流时它总是工作的原因。写流通过将数据发送到目的地来减轻数据备份,因此所有数据都将通过管道传输,并且将触发完成事件。

当样本大小很小的时候,你的代码在没有写流的情况下工作的原因是你没有填充你的流,所以转换流可以接受足够的数据来允许完整的事件/阈值被击中。随着阈值的增加,流可以接受的数据量保持不变,而无需将其发送到另一个地方(写流)。这将导致您的流被备份,并且它不能再接受数据,这意味着完成的事件将永远不会被发出。

我冒昧地说,如果您增加转换流的highwatermark,您将能够增加阈值,并且仍然使代码工作。这种方法是不正确的。将你的流管道到一个写流,这个写流将把数据发送到dev/null。创建这个写流的方法是:

var writer = fs.createWriteStream('/dev/null');

Node.js文档中关于缓冲的部分解释了你遇到的错误

你不中断_transform和进程远。试一试:

this.emit('completed', ...);
this.end();

这就是为什么程序似乎在返回结果之前就终止了

并且不要输出任何无用的数据:

var wstream = fs.createWriteStream('/dev/null');

好运)

我建议使用Writable流而不是Transform流。然后将_transform重命名为_write,如果您管道到它,您的代码将使用该流。一个转换流,正如@Bradgnar已经指出的,需要一个消费者,否则它会阻止可读流将更多的数据推送到它的缓冲区。

相关内容

  • 没有找到相关文章

最新更新