节点 - 可读的流管()在for循环中覆盖以前的流



我试图将数据集合传输到多个文件,其中包括以下代码:

for (var key in data) {
  // skip if collection length is 0
  if (data[key].length > 0) {
    // Use the key and jobId to open file for appending
    let filePath = folderPath + '/' + key + '_' + jobId + '.txt';
    // Using stream to append the data output to file, which should perform better when file gets big
    let rs = new Readable();
    let n = data[key].length;
    let i = 0;
    rs._read = function () {
      rs.push(data[key][i++]);
      if (i === n) {
        rs.push(null);
      }
    };
    rs.pipe(fs.createWriteStream(filePath, {flags: 'a', encoding: 'utf-8'}));
  }
}

但是,我最终将所有文件都用相同的数据填充,这是data对象中最后一个键的数组。看来每个循环的读取器流都被覆盖了,并且pipe()到Writable stream直到for循环完成之前才启动。怎么可能?

因此,您代码可能不起作用的原因是rs._read方法是异步称为rynchrony的,并且您的密钥变量范围范围范围范围(由于VAR关键字)。

您创建的每个RS流都指向相同的变量,这是关键的,在主循环的末尾,每个回调都具有相同的值。当您将" var"更改为"让"时,将在每个迭代中创建新密钥变量,并且它将解决您的问题(_Read函数将拥有其自己的密钥变量副本,而不是共享的副本)。

如果将其更改为让它起作用。

这正在发生,因为您在loop语句中定义的key未被块分组。首先,这不是问题,但是当您在rs._read函数内部创建闭合时,所有后续流读取都使用了最后已知值,这是data数组的最后一个值。

当我们这样做时,我可以提出一些重构,以使代码清洁器和更重复使用:

const writeStream = (folderPath, index, jobId) => {
    const filePath = `${folderPath}/${index}_${jobId}.txt`;
    return fs.createWriteStream(filePath, {
        flags: 'a', encoding: 'utf-8'
    });
}
data.forEach((value, index) => {
    const length = value.length;
    if (length > 0) {
        const rs = new Readable();
        const n = length;
        let i = 0;
        rs._read = () => {
            rs.push(value[i++]);
            if (i === n) rs.push(null);
        }
        rs.pipe(writeStream(folderPath, index, jobId));
    }
});

相关内容

  • 没有找到相关文章

最新更新