逐行处理文件,并在输出中保持行顺序



所以我正在编写一个 NodeJS 路由,其中用户上传一个文件(缓冲区),逐行处理(需要调用 RESP API 来处理每一行),然后将结果输出到另一个缓冲区,作为下载文件发送给用户。

这是路由代码:

app.post('/tokenizeFile', isLoggedIn, upload.single('file'), function(req, res){
var file = req.file;
//File Validations
if (!validator.validateFile(file)) res.redirect('/?err=invalidFile');
//Process file
tokenizer.tokenizeFile(file, req).then((data)=>{
//res.setHeader('Content-Length', stat.size);
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Content-Disposition', 'attachment; filename=tokenized.txt');
res.write(data, 'binary');
res.end();
}).catch((err)=>{
res.redirect('/?err='+err);
});
});

这是tokenizer.tokenizeFile代码:

tokenizeFile: function(file, req){
actionLogger.info(`Request to tokenize ${file.originalname} received. Made by: ${req.user.displayName}`);
return new Promise(function(resolve, reject){
var fileProcessPromise = Promise.resolve();
var lineReader = require('readline').createInterface({
input: require('streamifier').createReadStream(file.buffer)
});
var output = "";
lineReader.on('line', function (line) {
//Tokenize each line
if (!validate.validateLine(line)) return reject(`Invalid line [${line}].`);
fileProcessPromise = Tokenize(line)
.then((data)=>{
output += data + "\r\n";
})
.catch((err)=>{
reject(`API didn't respond.`);
});
});
lineReader.on('close', () => {
fileProcessPromise.then(()=>resolve(output));
});
});
}

Tokenize 函数返回一个承诺,因为它是对 RESTful API 的 HTTP 请求。

问题是我需要输出文件来保持相同的顺序,并且使用上面的代码,它将具有顺序,具体取决于 Tokenize 函数解析的速度。

关于如何实现这一目标的任何想法?

1) 标记化文件代码:

tokenizeFile: (file, req) => {
actionLogger.info(`Request to tokenize ${file.originalname} received. Made by: ${req.user.displayName}`);
return new Promise((resolve, reject) => {
const lines = [], responses = [];
const lineReader = require('readline').createInterface({
input: require('streamifier').createReadStream(file.buffer)
});
// 1. read all lines to array
lineReader.on('line', line => {
if(!validate.validateLine(line)) {
return reject(`Invalid line [${line}].`);
}
lines.push(line);
});

lineReader.on('close', async () => {
// 2. process every line sequentially
try {
for(const line of lines) {
const response = await Tokenize(line);
responses.push(response);
}
resolve(responses.join("n"));
}
.catch(error => {
console.log(error);
reject("API didn't respond");
});
});
});
}

2)和请求部分:

app.post(
'/tokenizeFile', 
isLoggedIn, 
upload.single('file'), 
async (req, res) => {
try {
const file = req.file;
if (!validator.validateFile(file)) {
throw new Error('invalidFile');
}
const data = await tokenizer.tokenizeFile(file, req);
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Content-Disposition', 'attachment; filename=tokenized.txt');
res.write(data, 'binary');
res.end();
})
.catch(error => {
res.redirect('/?err='+error);
});
});

以下是使用 Promise API 以及 async/await 组合的解决方案:

tokenizeFile: function(file, req) {
return new Promise((resolve, reject) => {
actionLogger.info(`Request to tokenize ${file.originalname} received. Made by: ${req.user.displayName}`);
var lineReader = require('readline').createInterface({
input: require('streamifier').createReadStream(file.buffer)
});
linePromises = [];
var validationFailed = false;
lineReader.on('line', line => {
if (!validate.validateLine(line)) {
//stop the lineReader if possible
reject(`Invalid line [${line}].`);
validationFailed = true;
}
if (!validationFailed) {
linePromises.push(Tokenize(line));
}
});
lineReader.on('close',async () => {
var outputStrings = [];
for (var linePromise of linePromises) {
var data = await linePromise;
outputStrings.push(data);
}
var output = outputStrings.join("");
resolve(output);
});
});    
}

它分两个阶段工作...首先,它向分词器发出所有请求承诺,并将这些承诺推送到数组中(顺序与原始行相同)。

当 lineReader 完成 ('close') 时,我们有一个动态承诺数组,我们循环通过这些承诺,按顺序等待每个承诺并将承诺的结果推送到数组中。完成后,我们只需将数组连接成一个大字符串并解析外部承诺。

使用流读取文件而不将其缓冲到磁盘,https://www.npmjs.com/package/multipart-read-stream

然后使用线路流拆分为行 https://www.npmjs.com/package/readline-transform 然后使用一些并行转换 https://www.npmjs.com/package/parallel-transform 然后将结果写入响应

您可能需要添加 Stream.pause() 和 Stream.resume on drain,如果未在 stream 模块中实现。 以动态处理大型内容。

也许使用 https://www.npmjs.com/package/stream-chunkify 进行大块写入流。因此,通过压缩或其他传输任务可以更容易地处理大块而不是小块。

也许使用 https://www.npmjs.com/package/bufferedstream

// npm install -save express readline-transform pump multipart-read-stream parallel-transform 
var multipart = require('multipart-read-stream')
var pump = require('pump')
const ReadlineTransform = require('readline-transform');
var ParallelTransform = require('parallel-transform');
var express = require('express')

var app = express()
app.get("/",function (req, res, next) {
res.writeHead(200, {'content-type': 'text/html'});
res.end(
'<form action="/upload" enctype="multipart/form-data" method="post">'+
'<input type="text" name="title"><br>'+
'<input type="file" name="upload" multiple=""><br>'+
'<input type="submit" value="Upload">'+
'</form>'
);
})

app.post("/upload",function (req, res) {

var multipartStream = multipart(req.headers, handler)
// res.writeHead(200, {'content-type': 'text/plain'}); pump(req, res);  return; // debug
pump(req, multipartStream, function (err) {
if (err) { res.end('server error'); return;}
res.end()
})

var handled=false
function handler (fieldname, file, filename) {
// from example:
//console.log('reading file ' + filename + ' from field ' + fieldname)
//var fileStream = fs.createWriteStream(path.join('/tmp', filename))
//pump(file, fileStream)
if(handled) return;
handled=true;
// res.writeHead(200, {'content-type': 'text/plain'}); pump(file, res);  return; // debug
const transform = new ReadlineTransform({ skipEmpty: false });
pump(file, transform)
//res.writeHead(200, {'content-type': 'text/plain'}); pump(transform, res);  return; // debug

var first=true;
// maybe:
var parallel = ParallelTransform(10, {objectMode:false}, function(data, callback) { // 10 is the parallism level
// this might be faster:
//  if(first){
//      first=false;
//      callback(null, "transformed:"+data);     
//  }
//  else
//      callback(null, "rn"+"transformed:"+data);

(async()=>{
if(first){
first=false;
return "transformed:"+data;
}
else
{               
return "rn"+"transformed:"+data;
}
})().then( (data)=>callback(null,data) ).catch(  (error)=>callback(error,"") )

});
pump(transform, parallel)
//res.writeHead(200, {'content-type': 'text/plain'}); pump(parallel, res);  return; // debug


pump(parallel, res)
}
}).listen(8080)

最新更新