需要将DB表转换为csv报表。
如果我用一个查询立即卸载整个平板电脑,那么由于内存耗尽,应用程序崩溃。我决定按100行的部分从表中查询数据,将每行转换为报告的一行,并将其写入一个流,该流通过管道连接一个express响应。
这一切几乎是这样发生的:
-
DB查询
const select100Users = (maxUserCreationDateStr) => { return db.query(` SELECT * FROM users WHERE created_at < to_timestamp(${maxUserCreationDateStr}) ORDER BY created_at DESC LIMIT 100`); }
-
流初始化
const { PassThrough } = require('stream'); const getUserReportStream = () => { const stream = new PassThrough(); writeUserReport(stream).catch((e) => stream.emit('error', e)); return stream; };
-
用一个express响应给流加管道
app.get('/report', (req, res) => { const stream = getUserReportStream(); res.setHeader('Content-Type', 'application/vnd.ms-excel'); res.setHeader(`Content-Disposition', 'attachment; filename="${ filename }"`); stream.pipe(res); });
-
最后我如何将数据写入流
const writeUserReport(stream) => { let maxUserCreationDateGlobal = Math.trunc(Date.now() / 1000); let flag = true; stream.write(USER_REPORT_HEADER); while (flag) { const rows100 = await select100Users(maxUserCreationDateGlobal); console.log(rows100.length); if (rows100.length === 0) { flag = false; } else { let maxUserCreationDate = maxUserCreationDateGlobal; const users100 = await Promise.all( rows100.map((r) => { const created_at = r.created_at; const createdAt = new Date(created_at); if (created_at && createdAt.toString() !== 'Invalid Date') { const createdAtNumber = Math.trunc(createdAt.valueOf() / 1000); maxUserCreationDate = Math.min(maxUserCreationDate, createdAtNumber); } return mapUser(r); // returns a promise }) ); users100.forEach((u) => stream.write(generateCsvRowFromUser(u))); maxUserCreationDateGlobal = maxUserCreationDate; if (rows100.length < 100) { flag = false; console.log('***'); } } } console.log('end'); stream.end(); };
结果,我在控制台中看到如下输出:
100 // 100
100 // 200
100 // 300
100 // 400
100 // 500
87 // 587
***
end
但是在下载的文件中,我得到401行(第一行带有USER_REPORT_HEADER)。感觉stream.end()
在读取所有值之前关闭了流。
我尝试使用rxjs中的BehaviorSubject而不是PassThrough以类似的方式-结果是相同的..
我如何等待从流中读取我写在那里的所有数据?
或者有人可以推荐另一种方法来解决这个问题。
stream.write
期望您传递回调作为第二个(或第三个参数),以知道写操作何时完成。除非之前的写操作完成,否则不能再次调用write。
stream.write
时都将其封装到一个Promise中,例如
await new Promise((resolve, reject) => stream.write(data, (error) => {
if (error) {
reject(error);
return;
}
resolve();
});
显然,把它提取到某个方法中是有意义的。
编辑另外,我不认为这是真正的问题。我假设你的http连接只是在所有的抓取完成之前超时,所以服务器最终会关闭流一旦超时期限被满足。