流.转换在完成输出之前接受新输入



我正在实现一个带有异步操作的转换流。我的叫Parser.

var Transform = require('stream').transform;
function Parser(options) {
  Transform.call(this, {objectMode: true});
}
Parser.prototype._transform = function _transform(input, encoding, callback) {
  var this_ = this;
  doSomethingAsync(input, function(output) {
    this_.push(output);
    //possible location #1 for callback();
  });
  //possible location #2 for callback();
}

每个传入的块可能需要很长时间来处理(doSomethingAsync需要网络请求)。但是,每个块的处理完全独立于之前的块。此外,输出的确切顺序并不重要。每个输出都包含一个描述符,用于标识其输入,而不是按顺序标识。

因此,我希望尽快再次调用_transform,而不是等到给定的块完全完成处理。因此,查看代码,如果我callback()放入possible location #1,则在每个块完全处理之前永远不会调用_transform。但是如果我把它放在possible location #2,那么我的流在回调之后推送,这会导致这些难看

Uncaught Error: stream.push() after EOF

流终止后出错。

所以我的问题:是否可以使用转换流来做到这一点?还是我应该考虑使用库?如果是,哪种类型(事件流、FRP 等)?

谢谢。

您可以在流上实现_flush(),并且仅在所有异步函数完成时调用传递给该函数的回调。像这样:

function Parser(options) {
  Transform.call(this, {objectMode: true});
  this._pending = 0;
  this._flushcb = undefined;
}
Parser.prototype._transform = function _transform(input, encoding, callback) {
  var self = this;
  ++this._pending;
  doSomethingAsync(input, function(output) {
    self.push(output);
    if (--self._pending === 0 && self._flushcb)
      self._flushcb();
  });
  callback();
}
Parser.prototype._flush = function(callback) {
  this._flushcb = callback;
};
我相信

答案并不完整。想象一下,您有这样的_transform()

_transform(chunk, encoding, done) {
    let data = chunk.toString();
    this.rest += data;
    [this.toPush, this.rest] = this.f(this.rest);
    for (let i = 0; i < this.toPush.length; i++) {
        if (!this.push(this.toPush[i])) {
            this._source.readStop();
            break;
        } 
    }
    done()
}
'

''

例如,其中f是一个将收到的块拆分为段落的函数。 rest是块末尾的内容f无法确定它是否是整个段落,因此需要更多的数据(另一个块)。当所有内容都读完后,可以假设rest是整个段落,然后使用_flush来推动它,如下所示。抛出上述异常,可能是因为"<p>"+this.rest+"</p>"大于 this.rest 。这不是真正的预期行为...

 _flush(done) {
    if (this.rest !== "") this.push("<p>"+this.rest+"</p>");
    this.rest = null;
    this.toPush = null;
    done()
 }

编辑:所以Calvin Metcalf给了我一个解决方法 herehttps://github.com/nodejs/readable-stream/issues/207:在节点8.0.0上,可以使用_final而不是_flush。这个问题似乎很不稳定,因为他没有在他的环境中重现。

相关内容

  • 没有找到相关文章

最新更新