Mongoose使用异步迭代器流式传输聚合查询



我想使用Mongoose流式传输聚合查询的结果,以便允许客户端处理巨大的JSON响应(最终通过管道传输到CSV转换器(。

到目前为止,我的代码:

const pipeline = [
{
$unwind: {
path: '$samples',
// The name of a new field to hold the array index of the element.
includeArrayIndex: 'num_sample',
preserveNullAndEmptyArrays: true,
},
},
{
$limit: 10000,
},
{
$project: {
_id: 0,
t: '$samples.t',
station: '$station.name',
loc: '$station.location',
data: '$samples.data',
},
},
];
// const samples = await fixed.aggregate([pipeline]);
const cursor = fixed
.aggregate(pipeline)
.cursor({ batchSize: 1000 })
.exec();
res.writeHead(200, { 'content-type': 'application/json' });
res.write('[');
await cursor.eachAsync(async (doc, i) => {
res.write(JSON.stringify(doc));
res.write(',');
});
res.write('{}]');
res.end();

但是,如何将响应作为json2csv管道传输到CSV转换器?上面的代码功能正确吗?我必须在响应流中写入额外的字符才能正确格式化JSON,但找到的解决方案(带有最后一个{}(在最终的JSON中引入了一个空记录(除了最后一个,我还没有找到在从mongoose游标返回的每个文档后写入","的方法,因此,我必须引入一个空的记录(。

这是一个流,意味着您必须将该流pipe发送到res,类似于以下内容:

cursor.pipe(JSONStream.stringify()).pipe(res)

有关详细信息,请参阅此问题。

我终于用下面的代码解决了这个问题。

欢迎提出任何改进所提供解决方案的建议。

exports.get_FIXED_Samples_CSV = catchAsync(async (req, res, next) => {
// retrieve start and end dates
// if start is not provided then set it to current date - 30 days
const startDate =
moment(req.query.start).isValid() && req.query.start
? moment(new Date(req.query.start))
: moment(new Date()).subtract(30, 'd');
// if end is not provided or invalid set it to current date
const endDate =
moment(req.query.end).isValid() && req.query.end
? moment(new Date(req.query.end))
: moment(new Date());
// retrieve station name and check if valid,if not returns null
const station = [
'AQ101',
'AQ102',
'AQ103',
'AQ104',
'AQ105',
'AQ106',
'AQ107',
].includes(req.query.station)
? req.query.station
: null;
// eslint-disable line no-unused-vars
const pipeline = [
// sort by date DESC adn station.name ASC
{
$sort: {
'station.name': 1,
date: -1,
},
},
// unwind by samples array adding num_sample counting
{
$unwind: {
path: '$samples',
// The name of a new field to hold the array index of the element.
includeArrayIndex: 'num_sample',
preserveNullAndEmptyArrays: true,
},
},
// ~~~~~~~~~~~~ set limit to returning docs ~~~~~~~~~~~~~~~~~~~
{
$limit: 216000,
},
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// select fields to show
{
$project: {
_id: 0,
t: '$samples.t',
station: '$station.name',
loc: '$station.location',
data: '$samples.data',
},
},
];
// add as first stage to the pipeline a match aggregation
if (station) {
// if valid station  is provided
pipeline.unshift({
$match: {
'station.name': station,
date: {
$gte: new Date(startDate.format('YYYY-MM-DD')),
$lte: new Date(endDate.format('YYYY-MM-DD')),
},
},
});
} else {
// if station is INVALID OR NOT provided
pipeline.unshift({
$match: {
date: {
$gte: new Date(startDate.format('YYYY-MM-DD')),
$lte: new Date(endDate.format('YYYY-MM-DD')),
},
},
});
}
// transform to apply to generate CSV
const custTransf = (item, strFormat = 'DD/MM/YYYY HH:mm:ss') => ({
utc: item.t,
t: moment(item.t).format(strFormat),
station: item.station,
...item.data,
});
// Unwind Samples properties and flatten arrays
const transforms = [
// flatten({ objects: false, arrays: true }),
custTransf,
];
// const fields = ['t', 'station', 'data'];
const opts = { transforms };
const transformOpts = { highWaterMark: 8192 };
const pipeTransf = new Transform(opts, transformOpts);
// remove data prefix from fields
const regex = /(data.)/gi;
const filename = 'FixedStations';
const strAtt = `attachment;filename=${filename}-${startDate.format(
'YYYY-MMM-DD'
)}-${endDate.format('YYYY-MMM-DD')}.csv`;
res.header('Content-Type', 'text/csv');
res.setHeader('Content-Type', 'text/csv');
res.setHeader(
'Content-Disposition',
strAtt
// 'attachment;filename=download.csv'
);
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Pragma', 'no-cache');
const cursor = fixed
.aggregate(pipeline)
.allowDiskUse(true)
.cursor({ transfor: JSON.stringify, batchSize: 1000 })
.exec()
.pipe(JSONStream.stringify())
.pipe(pipeTransf)
.pipe(replace(regex, ''))
.pipe(res);
});

最新更新