AWS问题:在同一个bucket中复制一个大文件,但在不同的文件夹中



我有一个lambda函数的问题,这里是问题:我的lambda函数需要处理PutObject事件:

当PutObject被管理时,我必须将上传到同一个bucket的大文件复制到不同的文件夹。

我尝试了s3.copyObject((、s3.PutObject((和createMultipartUpload(([实现了整个循环来处理这个函数:uploadPart等…],但都没有成功!

事件被捕获,但随后函数不会在控制台中打印任何内容,既没有失败也没有成功。

λ:

// dependencies
const AWS = require('aws-sdk');
const util = require('util');
const fs = require('fs');
// get reference to S3 client
const s3 = new AWS.S3();
const uploadPart = (params, chunk, partno, final, cb) => {
console.log("##### Upload part: ", partno);
s3.uploadPart({
Body: chunk,
Bucket: params.Bucket,
Key: params.Key,
UploadId: params.UploadId,
PartNumber: partno
}, (err, res) => {
if (err) { console.log('## Errore: failed part uploaded: ', err); return; }
if (cb) cb(null, { size: chunk.length, ETag: res.ETag });
});
};
const completeMultipartUpload = (params, PartMap) => {
console.log("##### 4. INIT COMPLETE MULTIPART UPLOAD");
s3.completeMultipartUpload({
Bucket: params.Bucket,
Key: params.Key,
UploadId: params.UploadId,
MultipartUpload: PartMap
}, (err, data) => {
if (err) { console.log('## Errore: failed complete multipart upload: ', err); return; }
console.log('###### 5. Upload completed: ', JSON.stringify(data));
});
};
exports.handler = async (event, context, callback) => {
// Read options from the event parameter.
console.log("Reading options from event:n", util.inspect(event, {depth: 5}));

const srcBucket = event.Records[0].s3.bucket.name;
const srcKey    = decodeURIComponent(event.Records[0].s3.object.key.replace(/+/g, " "));
const maxchunksize = event.Records[0].s3.object.size;
const dstBucket = "bucketsrctest";

console.log("SRC KEY: ", srcKey, ", File Size: ", ((maxchunksize / 1024) / 1024), " MB");
// Infer the file type from the file suffix.
const typeMatch = srcKey.match(/.([^.]*)$/);
if (!typeMatch) {
console.log("Could not determine the file type.");
return;
}
// Check that the file type is supported
const fileType = typeMatch[1].toLowerCase();
if (fileType != "csv") {
console.log(`Unsupported file type: ${fileType}`);
return;
}

const URI_PARTS = srcKey.split('/');
const TOTAL_PARTS = URI_PARTS.length;

const pre_file_folder = URI_PARTS[TOTAL_PARTS - 2];
const hour = URI_PARTS[TOTAL_PARTS - 3];
const day = URI_PARTS[TOTAL_PARTS - 4];
const month = URI_PARTS[TOTAL_PARTS - 5];
const year = URI_PARTS[TOTAL_PARTS - 6];
const sub_folder = URI_PARTS[TOTAL_PARTS - 7];
const main_folder = URI_PARTS[TOTAL_PARTS - 8];

console.log("PATHS: ", URI_PARTS);

const dst = prepareData(main_folder);

try {
const finalDestinationPath = dst.folder + '/' + (dst.subfolder ? dst.subfolder + '/' + dst.renamedFile : dst.renamedFile);

const params = {
Bucket: srcBucket,
CopySource: srcKey,
Key: finalDestinationPath
};

console.log("####1. INITIALIZE UPLOAD: ", finalDestinationPath);

s3.createMultipartUpload({
Bucket: dstBucket,
Key: srcKey,
ContentType: 'text/csv'
}, (err, data) => {
console.log("##### 2. INIT MULTIPART UPLOAD");
if (err) { console.log('## Errore: failed create multipart upload: ', err); return; }
const file = fs.createReadStream(finalDestinationPath);
let pi = 1;
let partMap = [];
let streamedLength = 0;
let uploadedSize = 0;
let curchunk = Buffer(0);

const cmuParams = {
Key: srcKey,
Bucket: dstBucket,
UploadId: data.UploadId
};

const Writable = require('stream').Writable;
const ws = Writable();

ws.oend = ws.end;
ws.end = (chunk, encoding, callback) => {
ws.oend(chunk, encoding, callback);
uploadPart(cmuParams, curchunk, pi, true, (err, data) => {
partMap.push({ ETag: data.ETag, PartNumber: pi });
completeMultipartUpload(cmuParams, { Parts: partMap });
});
};

ws._write = (chunk, enc, next) => {
curchunk = Buffer.concat([curchunk, chunk]);
streamedLength += chunk.length;
if (curchunk.length > maxchunksize) {
uploadPart(cmuParams, curchunk, pi, false, (err, data) => {
uploadedSize += data.length;
partMap.push({ ETag: data.ETag, PartNumber: pi });
pi+=1;
curchunk = Buffer(0);
next();
});
} else {
next();
}
};

file.pipe(ws);
});
} catch(err) {
console.log("Result error: ", err);
return { statusCode: 500, body: err };
}
};

这可能是因为您使用的是异步lambda处理程序。这样,在处理程序的主体实际执行之前,函数就完成了。

要解决这个问题,您必须修改代码以使用同步处理程序,或者使用AWS文档中显示的异步处理程序的promise模式。

最新更新