使用nodejs管道流下载大文件会导致大量内存使用和OOM错误



我使用node js从服务器下载大型文件(300MB(,并将响应通过管道发送到文件写入流。就我对nodejs中管道的理解而言,数据流是由node管理的,我不必考虑排放和其他事件。我面临的问题是,我的应用程序运行的docker的内存使用量与下载的文件的使用量相同(即,文件似乎保存在内存中(。即使我删除了docker中的文件,这种内存使用情况仍然存在。我附上了用于创建请求和管道的代码,供参考。该代码运行良好,但会导致性能问题,如大量内存/CPU使用,并因OOM错误而崩溃。我不明白我做错了什么。

let req = request({
url: firmwareURL,
maxAttempts: 5,
retryDelay: 5000,
retryStrategy: request.RetryStrategies.HTTPOrNetworkError});
// 1. Perform server request
req.on('response', (res) => {
console.log(methodName, 'Download response statusCode:', res.statusCode);
if (res.statusCode === 200) {
abortOperation = false;
isStarted = "yes";
// 1.1 Create local file stream if the file is found on url and WaterMark paramter, for bigger chunk
// basepath + basefirmware folder + firmware name + file extension
fileStoragePath = `${firmwareDirectory}/${ip}`;
console.log("filestoragepath is",fileStoragePath);
fileName = `${firmwareVersion}.${firmwareURL.split(".").pop()}`;
// temporary store the file
tempFile = `${fileStoragePath}/${fileName}`;
console.log("tempfile is",tempFile);
writestream = fs.createWriteStream(tempFile, {
highWaterMark: Math.pow(2,20 )
}); // for 1mb buffer,can be increased
writestream.on('error', function (err) {
// on error
console.log('Error while creating a file write stream' + err);
abortOperation = true;
isStarted = "no";
_deleteProgressPointer(ip);
});
// 1.2 Get content length of the current response
size = parseInt(res.headers['content-length'], 10);
console.log(methodName, 'File size is:', size);
req.pipe(writestream);
} else {
// 1.3 Ignore next request events on failure
console.log(methodName, 'File not found on server. res.statusCode:', res.statusCode);
abortOperation = true;
isStarted = "no";
_deleteProgressPointer(ip);
}
});
// 3. In case of error ignore next request events
req.on('error', (error) => {
console.log(methodName, 'File not found on server:', error);
abortOperation = true;
isStarted = "no";
_deleteProgressPointer(ip);
});
// 4. After stream is received close the connection
req.on('end', () => {
if (!abortOperation) {
if (null !== writestream) {
writestream.end();
writestream.on('finish', function () {
console.log(methodName, `File successfully downloaded for device ${ip} of firmware version ${firmwareVersion}`);
try {
// file extraction/storage operation
// further check whether the file extension is valid or not
if (ALLOWED_EXTENSION.includes(firmwareURL.split(".").pop())) {
try {
//req.unpipe(writestream);
fileio.removeFile(tempFile); //deleting downloaded file to avoid storage issues
});
console.log("upgrade ended");
return upgradeOp;
} catch (error) {
console.log(`Error while renamining file: ${tempFile}`);
}
} else {
console.log(methodName, ` Not an valid file extension: ${tempFile}`);
fileio.removeFile(tempFile);
console.log(methodName, ` Invalid: ${tempFile} removed`);
}
// delete the progress pointer
_deleteProgressPointer(ip);
} catch (error) {
// delete the progress pointer
_deleteProgressPointer(ip);
console.log(methodName, `Error during read/write operation :${error}`);
}
});
}

问题是您使用的是requestretry包,它并不真正支持流式传输。它总是用回调调用request,并将提供一个通过完整响应解决的promise。当提供这样的callback时,请求库将读取整个响应体,这确实在内存中缓冲了整个响应。这不是你想要的。

我看不出只使用requestretry进行流式传输的方法,所以您应该直接使用request包(或者,考虑到它的弃用,它的一个后续库(,并自己处理重试逻辑。

最新更新