如何在nodejs中的管道流中发生错误事件后恢复



在MyWritableStream中发出错误事件后,数据传输停止。我需要做什么才能恢复数据传输?

var readable = fs.createReadStream('test.txt');
var writable = new MyWritableStream();
writable.on('error', function(error) {
    console.log('error', error);
    // How i can resume?
});
writable.on('finish', function(){
    console.log('finished');
})
readable.pipe(writable);

我知道这个问题很老了,但你可能想看看https://github.com/miraclx/xresilient

我建造这个完全是出于同样的原因(最适合查找流)。

您定义了一个返回可读流的函数,库会测量在出现错误之前通过的字节数。

一旦可读流遇到error事件,它就会用读取的字节数调用已定义的函数,以便对流源进行索引。

示例:

const fs = require('fs');
const xresilient = require('xresilient');
const readable = xresilient(({bytesRead}) => {
  return generateSeekableStreamSomehow({start: bytesRead});
}, {retries: 5});
const writable = fs.createWriteStream('file.test');
readable.pipe(writable);
  • 文件流可以使用fs.createReadStream()函数的start选项进行索引
  • HTTP请求可使用Range HTTP头进行索引

看看吧。https://www.npmjs.com/package/xresilient

我不确定这是否是一种正常的做法,但我现在看不到其他解决方案&它对我有效。如果你能建议更准确的解决方案,请这样做。

我们可以使用可写事件中的pipe事件来跟踪可读流实例:

function WriteableStream(options) {
    Writable.call(this, options);
    this.source = null;
    var instance = this;
    this.on('pipe', function(source){
        instance.source = source;
    });
}
util.inherits(WriteableStream, Writable);

所以,当我们发出错误事件,可读流被自动取消管道时,我们可以自己重新管道:

WriteableStream.prototype._write = function(chunk, encoding, done) {
    this.emit('error', new Error('test')); // unpipes readable
    done();
};
WriteableStream.prototype.resume = function() {
    this.source.pipe(this); // re-pipes readable
}

最后,我们将以以下方式使用它:

var readable = fs.createReadStream(file);
var writeable = new WriteableStream();
writeable.on('error', function(error) {
    console.log('error', error);
    writeable.resume();
});
readable.pipe(writeable);

相关内容

  • 没有找到相关文章

最新更新