如何延迟管道初始化,直到上游发现值?



所以我使用 busboy 作为我的中间件,在我的快速服务器中流式传输包含 CSV 文件的表单数据。这些 CSV 文件可以有不同数量的额外配置参数,因此我需要解析第一行以计算出有多少参数,然后再将管道初始化到 csv 解析器。我的方法看起来像这样:

// HTML Form parser middleware for dealing with file uploads
router.post("*", (req: Request, res: Response, next: NextFunction) => {
let busboy = new Busboy({ headers: req.headers });
busboy.on("file", (fieldname, file, filename, encoding, mimetype) => {
file.on("end", () => {
console.log("File [" + fieldname + "] Finished");
});
// number of CSV parameters to be found by splitting first line
let paramsLen: number;
// first line varible. Outside data callback incase first line is split over multiple data chunks
let firstLine = "";
// line split regex. works from new line and EOF
const lineSplitReg: RegExp = /[nZ]/;
return new Promise((f, r) => {
file.on("data", data => {
console.log("File [" + fieldname + "] got " + data.length + " bytes");
if (!paramsLen) {
let strChunk = data.toString();
if (lineSplitReg.test(strChunk)) {
firstLine += strChunk.split(lineSplitReg)[0];
paramsLen = firstLine.split(",").length;
// paramsLen now found! init pipe to csv writeable
f();
} else {
// long line. contiune reading in next data chunk
firstLine += strChunk;
}
}
});
})
.then(() => {
let headers: string[] = [
"id",
"brand",
"product",
"serialNumber",
"site",
"area",
"location",
"longitude",
"latitude",
];
// add extra config headers once paramsLen has been discovered
let cNum = 1;
for (let i = headers.length; i < paramsLen; i = i + 2) {
headers.push(`c${cNum}`);
headers.push(`v${cNum}`);
cNum++;
}
file.pipe(
csv({
headers,
}),
);
})
});
busboy.on("finish", () => {
console.log("Done parsing form!");
if (!importingDevicesFromCsv) {
fulfill();
}
});
req.pipe(busboy);
})

问题在于,当承诺实现时,文件可读流已经消耗了部分或全部文件数据,这意味着这些块永远不会传递给csv可读流。那么,鉴于我们可能必须事先读取多个数据块,我如何读取流数据,但在建立到 csv 解析器的管道之前不使用它呢?

我的解决方案是创建一个承诺,该承诺包装一个转换流,该转换流读取数据但不使用数据并将数据保存在数组中(包括发布回调(。当发现paramsLen时,转换对象实现了承诺,然后建立了管道,最后耗尽了转换流中保留的数据。见下文:

// HTML Form parser middleware for dealing with file uploads
router.post("*", (req: Request, res: Response, next: NextFunction) => {
let busboy = new Busboy({ headers: req.headers });
busboy.on("file", (fieldname, file, filename, encoding, mimetype) => {
file.on("end", () => {
console.log("File [" + fieldname + "] Finished");
});
file.on("data", data => {
console.log("File [" + fieldname + "] got " + data.length + " bytes");
});
return new Promise((f, r) => {
let ts: {
dataArray: Array<[Buffer, Function]>;
paramsLen: number;
firstLine: string;
lineSplitReg: RegExp;
stream: Transform;
drainDone: boolean;
drain(): void;
} = {
dataArray: [],
paramsLen: undefined,
firstLine: "",
lineSplitReg: /[nZ]/,
drainDone: false,
drain: () => {
ts.dataArray.forEach(x => {
x[1](null, x[0]);
});
ts.drainDone = true;
},
stream: new Transform({
transform: (data: Buffer, enc, callback: Function) => {
// if drain finished pass data straight through
if (ts.drainDone) {
return callback(null, data);
}
ts.dataArray.push([data, callback]);
if (!ts.paramsLen) {
let strChunk = data.toString();
if (ts.lineSplitReg.test(strChunk)) {
ts.firstLine += strChunk.split(ts.lineSplitReg)[0];
ts.paramsLen = ts.firstLine.split(",").length;
f(ts);
} else {
// long line. contiune reading in next data chunk
ts.firstLine += strChunk;
}
}
},
}),
};
file.pipe(ts);
})
.then(ts => {
let headers: string[] = [
"id",
"brand",
"product",
"serialNumber",
"site",
"area",
"location",
"longitude",
"latitude",
];
// add extra config headers once paramsLen has been discovered
let cNum = 1;
for (let i = headers.length; i < paramsLen; i = i + 2) {
headers.push(`c${cNum}`);
headers.push(`v${cNum}`);
cNum++;
}
ts.stream.pipe(
csv({
headers,
}),
);
// drain transform stream
ts.drain();
})
});
busboy.on("finish", () => {
console.log("Done parsing form!");
if (!importingDevicesFromCsv) {
fulfill();
}
});
req.pipe(busboy);
})

相关内容

最新更新