>在Node中.js我正在使用fs.createWriteStream
方法将数据附加到本地文件。在 Node 文档中,他们提到了使用 fs.createWriteStream
时的drain
事件,但我不明白。
var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);
在上面的代码中,如何使用 drain 事件?事件在下面是否正确使用?
var data = 'this is my data';
if (!streamExists) {
var stream = fs.createWriteStream('fileName.txt');
}
var result = stream.write(data);
if (!result) {
stream.once('drain', function() {
stream.write(data);
});
}
drain
事件适用于可写流的内部缓冲区被清空的情况。
仅当内部缓冲区的大小超过其 highWaterMark
属性(可写流的内部缓冲区内,直到它停止从数据源读取)之前,才会发生这种情况。
发生此类事件的原因可能是由于设置涉及从一个流读取数据源的速度比写入另一个资源的速度快。例如,采用两个流:
var fs = require('fs');
var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');
现在想象一下,文件read
在 SSD 上,可以以 500MB/s 的速度读取,而write
位于只能以 150MB/s
的速度写入的 HDD 上。写入流将无法跟上,并将开始在内部缓冲区中存储数据。一旦缓冲区到达highWaterMark
,默认为16KB,写入将开始返回false
,并且流将在内部排队排出。一旦内部缓冲区的长度为 0,则触发 drain
事件。
这是排水管的工作原理:
if (state.length === 0 && state.needDrain) {
state.needDrain = false;
stream.emit('drain');
}
这些是排水的先决条件,排水是writeOrBuffer
功能的一部分:
var ret = state.length < state.highWaterMark;
state.needDrain = !ret;
若要了解如何使用 drain
事件,请以 Node.js 文档中的示例为例。
function writeOneMillionTimes(writer, data, encoding, callback) {
var i = 1000000;
write();
function write() {
var ok = true;
do {
i -= 1;
if (i === 0) {
// last time!
writer.write(data, encoding, callback);
} else {
// see if we should continue, or wait
// don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// had to stop early!
// write some more once it drains
writer.once('drain', write);
}
}
}
该函数的目标是向可写流写入 1,000,000 次。发生的情况是将变量设置为 true ok
并且仅在 ok
为 true 时执行循环。对于每个循环迭代,ok
的值设置为 stream.write()
的值,如果需要drain
,它将返回 false。如果 ok
变为 false,则 drain
的事件处理程序将等待并着火,恢复写入。
具体到您的代码,您不需要使用 drain
事件,因为您只在打开流后立即编写一次。由于尚未向流写入任何内容,因此内部缓冲区为空,并且必须以块形式写入至少 16KB 才能触发drain
事件。drain
事件用于多次写入比可写流的highWaterMark
设置更多的数据。
想象一下,您正在连接 2 个带宽非常不同的流,例如,将本地文件上传到慢速服务器。(快速)文件流发出数据的速度将快于(慢速)套接字流消耗数据的速度。
在这种情况下,node.js 会将数据保留在内存中,直到慢速流有机会处理它。如果文件非常大,这可能会出现问题。
为了避免这种情况,Stream.write
在基础系统缓冲区已满时返回false
。如果停止写入,流稍后将发出 drain
事件,指示系统缓冲区已清空,适合再次写入。
您可以使用pause/resume
可读流并控制可读流的带宽。
更好:您可以使用readable.pipe(writable)
为您执行此操作。
编辑:你的代码中有一个错误:无论write
返回什么,你的数据都已经写入。您无需重试。在你的情况下,你写了两次data
。
这样的东西会起作用:
var packets = […],
current = -1;
function niceWrite() {
current += 1;
if (current === packets.length)
return stream.end();
var nextPacket = packets[current],
canContinue = stream.write(nextPacket);
// wait until stream drains to continue
if (!canContinue)
stream.once('drain', niceWrite);
else
niceWrite();
}
这是一个带有 async/await 的版本
const write = (writer, data) => {
return new Promise((resolve) => {
if (!writer.write(data)) {
writer.once('drain', resolve)
}
else {
resolve()
}
})
}
// usage
const run = async () => {
const write_stream = fs.createWriteStream('...')
const max = 1000000
let current = 0
while (current <= max) {
await write(write_stream, current++)
}
}
https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73
这是一个使用承诺(async/await)的速度优化版本。调用方必须检查它是否得到promise
,并且只有在这种情况下才必须调用await
。在每次调用时执行等待可能会使程序减慢 3 倍......
const write = (writer, data) => {
// return a promise only when we get a drain
if (!writer.write(data)) {
return new Promise((resolve) => {
writer.once('drain', resolve)
})
}
}
// usage
const run = async () => {
const write_stream = fs.createWriteStream('...')
const max = 1000000
let current = 0
while (current <= max) {
const promise = write(write_stream, current++)
// since drain happens rarely, awaiting each write call is really slow.
if (promise) {
// we got a drain event, therefore we wait
await promise
}
}
}