我正在使用lento(用于Presto的Streaming Node.js客户端,"用于大数据的分布式SQL查询引擎"(来查询数据库。Lento的createRowStream
将sql查询作为字符串或Buffer,并返回生成行的可读流。
在返回结果行流之前,我需要对结果进行一些预处理(stream.pipe()
为我做这件事(,并将其转换为CSV格式(csvStringify为我做那件事(
流结束后,我使用resolve()
回调解析promise,并记录流的行数。但是,与流式传输的行数相比,实际返回的行数更少。例如,如果日志显示count为10000(变量rowsCnt
(,则返回的行数将接近6000。
返回的行数不一致的原因是什么?
请检查以下导入和代码片段:
import csvStringify from 'csv-stringify';
import {Request, Response} from 'express';
import lento from 'lento';
import streamTransform from 'stream-transform';
async getCSVRows(res: Response, sqlQueries: sqlDto[]): Promise<void> {
const result = [];
const lentoClient = ... code to create instance of lento client
for (let index = 0; index < sqlQueries.length; index++) {
const csvStream = csvStringify({header: index == 0 ? true : false});
queryResult = executeQuery(res, sqlQueries[index], lentoClient, csvStream)
result.push(queryResult)
}
await Promise.all(result);
res.end()
}
async executeQuery(
res: Response,
sqlQuery: sqlDto,
lentoClient: any,
csvStream: csvStringify.Stringifier
): Promise<Response> {
return new Promise(async (resolve) => {
const rowsStream = lentoClient.createRowStream(sqlQuery.query);
let rowsCnt = 0;
rowsStream.on('data', function() {})
.pipe(
streamTransform((row: any) => {
// process row
rowsCnt++;
return row;
}),
)
.pipe(csvStream)
.pipe(res, {end: false});
rowsStream.on('error', (err: Error) => {
// log error
throw err;
});
rowsStream.on('end', () => {
resolve(res);
console.log('Rows count: ' + rowsCnt);
});
});
}
注:使用的框架是NestJS
我认为您过早地结束了响应,而管道中仍在进行处理。在您的代码中,当rowsStream发出"end"事件时,并不意味着处理结束。这意味着查询结果的最后一块在管道中,但仍在处理并发送到客户端。您需要在所有处理完成并且管道为空之后结束响应。这很棘手,因为在同一响应中要处理多个查询,所以要使用{end:false}选项。正因为如此;res";不会发出结束事件,所以您必须以其他方式检测处理的结束。
以下是我的看法:mergeStream(…csvStreams(将在所有查询处理完毕时发出"end",并结束响应。
CSV的头行仍然不能保证在写入响应的第一个块中。
import csvStringify from 'csv-stringify';
import { Request, Response } from 'express';
import lento from 'lento';
import streamTransform from 'stream-transform';
import mergeStream from 'merge-stream';
createCSVRowStream(
sqlQuery: sqlDto,
lentoClient: any,
csvStream: csvStringify.Stringifier,
) {
return lentoClient
.createRowStream(sqlQuery.query)
.pipe(
streamTransform((row: any) => {
// process row
return row;
}),
)
.pipe(csvStream);
}
getCSVRows(res: Response, sqlQueries: sqlDto[]) {
//const lentoClient = ... code to create instance of lento client
const csvStreams = sqlQueries.map((sqlQuery, index) =>
createCSVRowStream(
sqlQuery,
lentoClient,
csvStringify({ header: index == 0 ? true : false }),
),
);
res.status(200);
mergeStream(...csvStreams).pipe(res);
}