我需要创建一个Zip文件,该文件由s3存储桶中的一组文件(视频和图像)组成。
目前使用下面代码的问题是,我很快就达到了Lambda的内存限制。
async.eachLimit(files, 10, function(file, next) {
var params = {
Bucket: bucket, // bucket name
Key: file.key
};
s3.getObject(params, function(err, data) {
if (err) {
console.log('file', file.key);
console.log('get image files err',err, err.stack); // an error occurred
} else {
console.log('file', file.key);
zip.file(file.key, data.Body);
next();
}
});
},
function(err) {
if (err) {
console.log('err', err);
} else {
console.log('zip', zip);
content = zip.generateNodeStream({
type: 'nodebuffer',
streamFiles:true
});
var params = {
Bucket: bucket, // name of dest bucket
Key: 'zipped/images.zip',
Body: content
};
s3.upload(params, function(err, data) {
if (err) {
console.log('upload zip to s3 err',err, err.stack); // an error occurred
} else {
console.log(data); // successful response
}
});
}
});
这可能是使用Lambda吗,还是我应该看一个不同的方法
是否可以动态写入压缩的zip文件,从而在一定程度上消除内存问题,或者是否需要在压缩之前收集文件?
任何帮助都将不胜感激。
好吧,我今天做了这个,它起作用了。直接缓冲到流,不涉及磁盘。因此,内存或磁盘限制在这里不会成为问题:
'use strict';
const AWS = require("aws-sdk");
AWS.config.update( { region: "eu-west-1" } );
const s3 = new AWS.S3( { apiVersion: '2006-03-01'} );
const _archiver = require('archiver');
//This returns us a stream.. consider it as a real pipe sending fluid to S3 bucket.. Don't forget it
const streamTo = (_bucket, _key) => {
var stream = require('stream');
var _pass = new stream.PassThrough();
s3.upload( { Bucket: _bucket, Key: _key, Body: _pass }, (_err, _data) => { /*...Handle Errors Here*/ } );
return _pass;
};
exports.handler = async (_req, _ctx, _cb) => {
var _keys = ['list of your file keys in s3'];
var _list = await Promise.all(_keys.map(_key => new Promise((_resolve, _reject) => {
s3.getObject({Bucket:'bucket-name', Key:_key})
.then(_data => _resolve( { data: _data.Body, name: `${_key.split('/').pop()}` } ));
}
))).catch(_err => { throw new Error(_err) } );
await new Promise((_resolve, _reject) => {
var _myStream = streamTo('bucket-name', 'fileName.zip'); //Now we instantiate that pipe...
var _archive = _archiver('zip');
_archive.on('error', err => { throw new Error(err); } );
//Your promise gets resolved when the fluid stops running... so that's when you get to close and resolve
_myStream.on('close', _resolve);
_myStream.on('end', _resolve);
_myStream.on('error', _reject);
_archive.pipe(_myStream); //Pass that pipe to _archive so it can push the fluid straigh down to S3 bucket
_list.forEach(_itm => _archive.append(_itm.data, { name: _itm.name } ) ); //And then we start adding files to it
_archive.finalize(); //Tell is, that's all we want to add. Then when it finishes, the promise will resolve in one of those events up there
}).catch(_err => { throw new Error(_err) } );
_cb(null, { } ); //Handle response back to server
};
我根据@iococker格式化了代码。
主入口
// index.js
'use strict';
const S3Zip = require('./s3-zip')
const params = {
files: [
{
fileName: '1.jpg',
key: 'key1.JPG'
},
{
fileName: '2.jpg',
key: 'key2.JPG'
}
],
zippedFileKey: 'zipped-file-key.zip'
}
exports.handler = async event => {
const s3Zip = new S3Zip(params);
await s3Zip.process();
return {
statusCode: 200,
body: JSON.stringify(
{
message: 'Zip file successfully!'
}
)
};
}
Zip文件util
// s3-zip.js
'use strict';
const fs = require('fs');
const AWS = require("aws-sdk");
const Archiver = require('archiver');
const Stream = require('stream');
const https = require('https');
const sslAgent = new https.Agent({
KeepAlive: true,
rejectUnauthorized: true
});
sslAgent.setMaxListeners(0);
AWS.config.update({
httpOptions: {
agent: sslAgent,
},
region: 'us-east-1'
});
module.exports = class S3Zip {
constructor(params, bucketName = 'default-bucket') {
this.params = params;
this.BucketName = bucketName;
}
async process() {
const { params, BucketName } = this;
const s3 = new AWS.S3({ apiVersion: '2006-03-01', params: { Bucket: BucketName } });
// create readstreams for all the output files and store them
const createReadStream = fs.createReadStream;
const s3FileDwnldStreams = params.files.map(item => {
const stream = s3.getObject({ Key: item.key }).createReadStream();
return {
stream,
fileName: item.fileName
}
});
const streamPassThrough = new Stream.PassThrough();
// Create a zip archive using streamPassThrough style for the linking request in s3bucket
const uploadParams = {
ACL: 'private',
Body: streamPassThrough,
ContentType: 'application/zip',
Key: params.zippedFileKey
};
const s3Upload = s3.upload(uploadParams, (err, data) => {
if (err) {
console.error('upload err', err)
} else {
console.log('upload data', data);
}
});
s3Upload.on('httpUploadProgress', progress => {
// console.log(progress); // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});
// create the archiver
const archive = Archiver('zip', {
zlib: { level: 0 }
});
archive.on('error', (error) => {
throw new Error(`${error.name} ${error.code} ${error.message} ${error.path} ${error.stack}`);
});
// connect the archiver to upload streamPassThrough and pipe all the download streams to it
await new Promise((resolve, reject) => {
console.log("Starting upload of the output Files Zip Archive");
streamPassThrough.on('close', resolve());
streamPassThrough.on('end', resolve());
streamPassThrough.on('error', reject());
archive.pipe(streamPassThrough);
s3FileDwnldStreams.forEach((s3FileDwnldStream) => {
archive.append(s3FileDwnldStream.stream, { name: s3FileDwnldStream.fileName })
});
archive.finalize();
}).catch((error) => {
throw new Error(`${error.code} ${error.message} ${error.data}`);
});
// Finally wait for the uploader to finish
await s3Upload.promise();
}
}
其他解决方案非常适合不太多的文件(小于约60个)。如果他们处理更多的文件,他们就会毫无错误地退出。这是因为它们打开了太多的溪流。
此解决方案的灵感来自https://gist.github.com/amiantos/16bacc9ed742c91151fcf1a41012445e
这是一个有效的解决方案,即使使用许多文件(+300)也能很好地工作,并向包含文件的zip返回一个预签名的URL。
主Lambda:
const AWS = require('aws-sdk');
const S3 = new AWS.S3({
apiVersion: '2006-03-01',
signatureVersion: 'v4',
httpOptions: {
timeout: 300000 // 5min Should Match Lambda function timeout
}
});
const archiver = require('archiver');
import stream from 'stream';
const UPLOAD_BUCKET_NAME = "my-s3-bucket";
const URL_EXPIRE_TIME = 5*60;
export async function getZipSignedUrl(event) {
const prefix = `uploads/id123123/}`; //replace this with your S3 prefix
let files = ["12314123.png", "56787567.png"] //replace this with your files
if (files.length == 0) {
console.log("No files to zip");
return result(404, "No pictures to download");
}
console.log("Files to zip: ", files);
try {
files = files.map(file => {
return {
fileName: file,
key: prefix + '/' + file,
type: "file"
};
});
const destinationKey = prefix + '/' + 'uploads.zip'
console.log("files: ", files);
console.log("destinationKey: ", destinationKey);
await streamToZipInS3(files, destinationKey);
const presignedUrl = await getSignedUrl(UPLOAD_BUCKET_NAME, destinationKey, URL_EXPIRE_TIME, "uploads.zip");
console.log("presignedUrl: ", presignedUrl);
if (!presignedUrl) {
return result(500, null);
}
return result(200, presignedUrl);
}
catch(error) {
console.error(`Error: ${error}`);
return result(500, null);
}
}
助手功能:
export function result(code, message) {
return {
statusCode: code,
body: JSON.stringify(
{
message: message
}
)
}
}
export async function streamToZipInS3(files, destinationKey) {
await new Promise(async (resolve, reject) => {
var zipStream = streamTo(UPLOAD_BUCKET_NAME, destinationKey, resolve);
zipStream.on("error", reject);
var archive = archiver("zip");
archive.on("error", err => {
throw new Error(err);
});
archive.pipe(zipStream);
for (const file of files) {
if (file["type"] == "file") {
archive.append(getStream(UPLOAD_BUCKET_NAME, file["key"]), {
name: file["fileName"]
});
}
}
archive.finalize();
})
.catch(err => {
console.log(err);
throw new Error(err);
});
}
function streamTo(bucket, key, resolve) {
var passthrough = new stream.PassThrough();
S3.upload(
{
Bucket: bucket,
Key: key,
Body: passthrough,
ContentType: "application/zip",
ServerSideEncryption: "AES256"
},
(err, data) => {
if (err) {
console.error('Error while uploading zip')
throw new Error(err);
reject(err)
return
}
console.log('Zip uploaded')
resolve()
}
).on("httpUploadProgress", progress => {
console.log(progress)
});
return passthrough;
}
function getStream(bucket, key) {
let streamCreated = false;
const passThroughStream = new stream.PassThrough();
passThroughStream.on("newListener", event => {
if (!streamCreated && event == "data") {
const s3Stream = S3
.getObject({ Bucket: bucket, Key: key })
.createReadStream();
s3Stream
.on("error", err => passThroughStream.emit("error", err))
.pipe(passThroughStream);
streamCreated = true;
}
});
return passThroughStream;
}
export async function getSignedUrl(bucket: string, key: string, expires: number, downloadFilename?: string): Promise<string> {
const exists = await objectExists(bucket, key);
if (!exists) {
console.info(`Object ${bucket}/${key} does not exists`);
return null
}
let params = {
Bucket: bucket,
Key: key,
Expires: expires,
};
if (downloadFilename) {
params['ResponseContentDisposition'] = `inline; filename="${encodeURIComponent(downloadFilename)}"`;
}
try {
const url = s3.getSignedUrl('getObject', params);
return url;
} catch (err) {
console.error(`Unable to get URL for ${bucket}/${key}`, err);
return null;
}
};
使用流可能很棘手,因为我不确定如何将多个流管道传输到一个对象中。我已经用标准文件对象做了好几次了。这是一个多步骤的过程,而且速度很快。请记住,Lambda在Linux中运行,因此您手头有所有Linux资源,包括system/tmp目录。
- 在/tmp中创建一个子目录,调用"transient"或任何对您有用的东西
- 使用s3.getObject()并将文件对象写入/tmp/transient
- 使用GLOB包从/tmp/transient生成路径数组[]
- 循环数组并压缩.addLocalFile(array[i])
- zip.writeZip('tmp/files.zip')
我使用了类似的方法,但我面临的问题是生成的ZIP文件中的一些文件没有正确的大小(以及相应的数据)。这个代码可以管理的文件的大小有限制吗?在我的情况下,我正在压缩大文件(一些大于1GB的文件),总数据量可能达到10GB。
我没有收到任何错误/警告消息,所以看起来一切都很好。
你知道会发生什么吗?
您可以使用adm-zip,它允许您直接处理磁盘或内存缓冲区中的zip文件。与同样存在未解决问题的节点归档器库相比,它的使用也更简单。
TypeScript代码:
import AdmZip from "adm-zip";
import { GetObjectCommand, GetObjectCommandOutput, PutObjectCommand, PutObjectCommandInput } from "@aws-sdk/client-s3";
export async function uploadZipFile(fileKeysToDownload: string[], bucket: string, uploadFileKey: string): Promise<void> {
// create a new zip file using "adm-zip"
let zipFile = new AdmZip();
// Download the existing files in S3 using GET API
// use parallel fetch in your code, for loop is shown here for simplicity
// invoke GET APIs for each element in fileKeysToDownload
// i = 0 -> (fileKeysToDownload.length - 1)
const data = await getObject(fileKeysToDownload[i], bucket);
const byteArray = await data!.transformToByteArray();
// add the byte arrays to the newly created zip file
zipFile.addFile(fileKeysToDownload[i], Buffer.from(byteArray));
// Convert this zip file to a byte array
const outputBody = zip.toBuffer();
// upload zip file to S3 using the PUT API
await putObject(outputBody, uploadFileKey);
};
async function getObject(key: string, bucket: string){
const command: GetObjectCommand = new GetObjectCommand({Bucket: bucket, Key: key});
const response: GetObjectCommandOutput = await s3.send(command);
return response.Body;
}
async function putObject(content: Buffer, key: string, bucket: string){
const input: PutObjectCommandInput = {
Body: content,
Bucket: bucket,
Key: key,
ContentType: "application/zip"
}
const response = await s3.send(
new PutObjectCommand(input)
);
}
这可能使用Lambda吗?还是我应该考虑另一种方法->是的,这是可能的。
是否可以动态写入压缩的zip文件,从而在一定程度上消除内存问题,或者是否需要在压缩之前收集文件->是的,请使用adm-zip
使用上述方法。