获取到一个express响应的流管道的不完整数据



需要将DB表转换为csv报表。
如果我用一个查询立即卸载整个平板电脑,那么由于内存耗尽,应用程序崩溃。我决定按100行的部分从表中查询数据,将每行转换为报告的一行,并将其写入一个流,该流通过管道连接一个express响应。

这一切几乎是这样发生的:

  1. DB查询

    const select100Users = (maxUserCreationDateStr) => {
    return db.query(`
    SELECT * FROM users WHERE created_at < to_timestamp(${maxUserCreationDateStr})
    ORDER BY created_at DESC LIMIT 100`);
    }
    

  2. 流初始化

    const { PassThrough } = require('stream');
    const getUserReportStream = () => {
    const stream = new PassThrough();
    writeUserReport(stream).catch((e) => stream.emit('error', e));
    return stream;
    };
    
  3. 用一个express响应给流加管道

    app.get('/report', (req, res) => {
    const stream = getUserReportStream();
    res.setHeader('Content-Type', 'application/vnd.ms-excel');
    res.setHeader(`Content-Disposition', 'attachment; filename="${ filename }"`);
    stream.pipe(res);
    });
    
  4. 最后我如何将数据写入流

    const writeUserReport(stream) => {
    let maxUserCreationDateGlobal = Math.trunc(Date.now() / 1000);
    let flag = true;
    stream.write(USER_REPORT_HEADER);
    while (flag) {
    const rows100 = await select100Users(maxUserCreationDateGlobal);
    console.log(rows100.length);
    if (rows100.length === 0) {
    flag = false;
    } else {
    let maxUserCreationDate = maxUserCreationDateGlobal;
    const users100 = await Promise.all(
    rows100.map((r) => {
    const created_at = r.created_at;
    const createdAt = new Date(created_at);
    if (created_at && createdAt.toString() !== 'Invalid Date') {
    const createdAtNumber = Math.trunc(createdAt.valueOf() / 1000);
    maxUserCreationDate = Math.min(maxUserCreationDate, createdAtNumber);
    }
    return mapUser(r); // returns a promise
    })
    );
    users100.forEach((u) => stream.write(generateCsvRowFromUser(u)));
    maxUserCreationDateGlobal = maxUserCreationDate;
    if (rows100.length < 100) {
    flag = false;
    console.log('***');
    }
    }
    }
    console.log('end');
    stream.end();
    };
    

结果,我在控制台中看到如下输出:

100 // 100
100 // 200
100 // 300
100 // 400
100 // 500
87  // 587
***
end

但是在下载的文件中,我得到401行(第一行带有USER_REPORT_HEADER)。感觉stream.end()在读取所有值之前关闭了流。

我尝试使用rxjs中的BehaviorSubject而不是PassThrough以类似的方式-结果是相同的..

我如何等待从流中读取我写在那里的所有数据?
或者有人可以推荐另一种方法来解决这个问题。

stream.write期望您传递回调作为第二个(或第三个参数),以知道写操作何时完成。除非之前的写操作完成,否则不能再次调用write。

所以一般来说,我建议将整个函数设置为异步并且每次调用stream.write时都将其封装到一个Promise中,例如
await new Promise((resolve, reject) => stream.write(data, (error) => {
if (error) {
reject(error);
return;
}
resolve();
});

显然,把它提取到某个方法中是有意义的。

编辑另外,我不认为这是真正的问题。我假设你的http连接只是在所有的抓取完成之前超时,所以服务器最终会关闭流一旦超时期限被满足。

相关内容

  • 没有找到相关文章

最新更新