云存储触发的云功能总是超时并超过截止日期



我正在使用Google Cloud Storage事件来触发一个云函数,该函数将上传的CSV写入云数据存储。问题是csv文件的8000 rows以上,并且函数的最大超时9mins不够。

Error: 4 DEADLINE_EXCEEDED: Deadline exceeded

我也尝试过批处理操作,但仍然存在超时问题在不需要太多重新架构的情况下,我可以使用其他解决方案吗

const {Datastore} = require('@google-cloud/datastore');
const {Storage} = require('@google-cloud/storage');
const db = new Datastore();
const storage = new Storage();
const path = require('path');
const os = require('os');
const fs = require('fs');
const csv = require('csv-parser');
exports.updateMasterlist = async (object, context, callback) => {
const fileBucket = object.bucket;
const filePath = object.name; 
const bucket = storage.bucket(fileBucket);
const fileName = path.basename(filePath);
const tempFilePath = path.join(os.tmpdir(), fileName);
await bucket.file(filePath).download({destination: tempFilePath})

var total = 0; 
var max = 0;
var employees = [];
var batch = 1;
const kind = 'masterlist';
fs.createReadStream(tempFilePath)
.pipe(csv())
.on('data', (record) => {
let key = record['EmployeeID'];
var empKey = db.key([kind, key]);
const employee = {
emp_id: record['EmployeeID'],
full_name: `${record['Firstname']} ${record['MiddleName']} ${record['Lastname']}`,
group: record['GroupName'],
division: record['Division'],
department: record['Department'],
is_id: record['SupervisorID'],
email_address: record['Email']
};
const emp_entity = {
key: empKey,
data: employee,
};
employees.push(emp_entity);
total++; max++;
if (max >= 499){
try {
db.upsert(employees);
console.log(`Uploading batch ${batch}`);
batch++;
}
catch (e) {
console.error(e);
process.exit(1);
}
employees.length = 0;
max=0; 
}
})
.on('end', async () => {
try {
await db.upsert(employees);
console.log(`Uploading batch ${batch}`);
}
catch (e) {
console.error(e);
process.exit(1);
}
console.log("End of CSV file read!");
console.log(`BATCH INFORMATION: `);
console.log(`number of employees: ${total}`);
});
callback();
};

下面是一个使用Python 3.7 的解决方案

from google.cloud import datastore
import csv
import time
start_time = time.time()
db = datastore.Client()
employees = []
count=1
with open('masterlist.csv', 'rt') as f:
reader = csv.DictReader(f)
for record in reader:
# Datastore Entities
key = record['EmployeeID']
empKey = db.key('masterlist', key)
employee = datastore.Entity(key=empKey)
employee.update({
'emp_id': record['EmployeeID'],
'full_name': str(record['Firstname']+' '+record['MiddleName']+' '+record['Lastname']),
'group': record['GroupName'],
'division': record['Division'],
'department': record['Department'],
'is_id': record['SupervisorID'],
'email_address': record['Email']
})
employees.append(employee)
if(len(employees) >= 499):
db.put_multi(employees)
employees.clear()
print('Batch: '+str(count)+' added')
count += 1
db.put_multi(employees)
print('Batch: '+str(count)+' added')
print("--- %s seconds ---" % (time.time() - start_time))

相关内容

  • 没有找到相关文章

最新更新