如何获取可写流并从缓冲区返回可读流?
我有以下内容将来自ftp服务器的数据写入块数组:
let chunks = []
let writable = new Writable
writable._write = (chunk, encoding, callback) => {
chunks.push(chunk)
callback()
}
然后我正在创建一个新的阅读流:
let readable = new ReadStream()
然后我尝试将可写内容通过管道传输到可读内容,但这似乎不起作用:
类型为"ReadStream"的参数不能分配给类型为"WritableStream"的参数。
writable.pipe(readable)
这是整个方法:
export class FTP {
readStream(filePath, options = {}) {
let conn = this.getConnection(this.name)
if (!conn) return Buffer.from('')
filePath = this.forceRoot(filePath)
let chunks = []
let writable = new Writable
writable._write = (chunk, encoding, callback) => {
chunks.push(chunk)
callback()
}
let readable = new ReadStream()
conn.client.download(writable, filePath, options.start || undefined)
writable.pipe(readable)
return readable
}
}
然后,我从流中读取并将输出通过管道传输到从http.createServer()
创建的响应对象,如下所示:
let stream = store.readStream(file, { start, end })
.on('open', () => stream.pipe(res))
.on('close', () => res.end())
.on('error', err => res.end(err))
是的,Node.js流很难掌握。从逻辑上讲,这里不需要两个流。如果要像从流中读取一样从 FTP 类中读取,则只需实现单个可读流。查看文档的这一部分,了解如何从头开始实现可读流:
class SourceWrapper extends Readable {
constructor(options) {
super(options);
this._source = getLowLevelSourceObject();
// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// If push() returns false, then stop reading from source.
if (!this.push(chunk))
this._source.readStop();
};
// When the source ends, push the EOF-signaling `null` chunk.
this._source.onend = () => {
this.push(null);
};
}
// _read() will be called when the stream wants to pull more data in.
// The advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
}
但是,从您的示例中,我可以得出结论,conn.client.download()
期望将可写流作为输入参数。在这种情况下,您只需要采用标准的直通流,它是一个双工(即左侧可写,右侧可读(流,未应用转换:
const { PassThrough } = require('stream');
export class FTP {
readStream(filePath, options = {}) {
let conn = this.getConnection(this.name);
if (!conn) return Buffer.from('');
filePath = this.forceRoot(filePath);
const pt = new PassThrough();
conn.client.download(pt, filePath, options.start);
return pt;
}
}
您可以在此处和此处找到有关 Node.js 流的更多信息。
UPD: 使用示例:
// assume res is an [express or similar] response object.
const s = store.readStream(file, { start, end });
s.pipe(res);
Pipe 在你思考时以相反的方式工作。根据Node.js的文档,pipe()
是一种Readable
的方法,它接受一个Writable
作为它的目的地。你试图做的是将Writable
管道传输到Readable
,但实际上它是一个可以管道到Writeable
的Readable
,而不是相反。
尝试将PassThrough
传递给download()
并返回相同的PassThrough
?