如何将可写缓冲区管道传输到读取流?



如何获取可写流并从缓冲区返回可读流?

我有以下内容将来自ftp服务器的数据写入块数组:

let chunks = []
let writable = new Writable
writable._write = (chunk, encoding, callback) => {
chunks.push(chunk)
callback()
}

然后我正在创建一个新的阅读流:

let readable = new ReadStream()

然后我尝试将可写内容通过管道传输到可读内容,但这似乎不起作用:

类型为"ReadStream"的参数不能分配给类型为"WritableStream"的参数。

writable.pipe(readable)

这是整个方法:

export class FTP {
readStream(filePath, options = {}) {
let conn = this.getConnection(this.name)
if (!conn) return Buffer.from('')
filePath = this.forceRoot(filePath) 
let chunks = []
let writable = new Writable
writable._write = (chunk, encoding, callback) => {
chunks.push(chunk)
callback()
}
let readable = new ReadStream()
conn.client.download(writable, filePath, options.start || undefined)
writable.pipe(readable)
return readable
}
}

然后,我从流中读取并将输出通过管道传输到从http.createServer()创建的响应对象,如下所示:

let stream = store.readStream(file, { start, end })
.on('open', () => stream.pipe(res))
.on('close', () => res.end())
.on('error', err => res.end(err))

是的,Node.js流很难掌握。从逻辑上讲,这里不需要两个流。如果要像从流中读取一样从 FTP 类中读取,则只需实现单个可读流。查看文档的这一部分,了解如何从头开始实现可读流:

class SourceWrapper extends Readable {
constructor(options) {
super(options);
this._source = getLowLevelSourceObject();
// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// If push() returns false, then stop reading from source.
if (!this.push(chunk))
this._source.readStop();
};
// When the source ends, push the EOF-signaling `null` chunk.
this._source.onend = () => {
this.push(null);
};
}
// _read() will be called when the stream wants to pull more data in.
// The advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
}

但是,从您的示例中,我可以得出结论,conn.client.download()期望将可写流作为输入参数。在这种情况下,您只需要采用标准的直通流,它是一个双工(即左侧可写,右侧可读(流,未应用转换:

const { PassThrough } = require('stream');
export class FTP {
readStream(filePath, options = {}) {
let conn = this.getConnection(this.name);
if (!conn) return Buffer.from('');
filePath = this.forceRoot(filePath);

const pt = new PassThrough();
conn.client.download(pt, filePath, options.start);
return pt;
}
}

您可以在此处和此处找到有关 Node.js 流的更多信息。

UPD: 使用示例:

// assume res is an [express or similar] response object.
const s = store.readStream(file, { start, end });
s.pipe(res);

Pipe 在你思考时以相反的方式工作。根据Node.js的文档,pipe()是一种Readable的方法,它接受一个Writable作为它的目的地。你试图做的是将Writable管道传输到Readable,但实际上它是一个可以管道到WriteableReadable,而不是相反。

尝试将PassThrough传递给download()并返回相同的PassThrough

最新更新