我正在写一个节点AWS lambda函数,该功能从我的数据库中查询约5,000个项目,并通过消息将其发送到AWS SQS队列中。
我的本地环境涉及我与AWS Sam Local一起运行Lambda,并用GoAws模拟AWS SQ。
我的lambda的一个示例骨骼是:
async run() {
try {
const accounts = await this.getAccountsFromDB();
const results = await this.writeAccountsIntoQueue(accounts);
return 'I've written: ' + results + ' messages into SQS';
} catch (e) {
console.log('Caught error running job: ');
console.log(e);
return e;
}
}
我的getAccountsFromDB()
功能没有性能问题,它几乎可以立即运行,使我返回了5,000个帐户的数组。
我的writeAccountsIntoQueue
功能看起来像:
async writeAccountsIntoQueue(accounts) {
// Extract the sqsClient and queueUrl from the class
const { sqsClient, queueUrl } = this;
try {
// Create array of functions to concurrenctly call later
let promises = accounts.map(acc => async () => await sqsClient.sendMessage({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(acc),
DelaySeconds: 10,
})
);
// Invoke the functions concurrently, using helper function `eachLimit`
let writtenMessages = await eachLimit(promises, 3);
return writtenMessages;
} catch (e) {
console.log('Error writing accounts into queue');
console.log(e);
return e;
}
}
我的助手,eachLimit
看起来像:
async function eachLimit (funcs, limit) {
let rest = funcs.slice(limit);
await Promise.all(
funcs.slice(0, limit).map(
async (func) => {
await func();
while (rest.length) {
await rest.shift()();
}
}
)
);
}
,就我的理解,它应该将并发执行限制为limit
。
此外,我已经包装了AWS SDK SQS客户端,以返回具有看起来像:
的sendMessage
函数的对象 sendMessage(params) {
const { client } = this;
return new Promise((resolve, reject) => {
client.sendMessage(params, (err, data) => {
if (err) {
console.log('Error sending message');
console.log(err);
return reject(err);
}
return resolve(data);
});
});
}
所以没有任何幻想,只是诺言。
我已经设置了300秒后的lambda,lambda总是会出来,如果不这样做,它突然结束了,错过了一些应该继续进行的最终记录,这使我感到不安在某个地方默默地错误。当我检查SQS队列时,我缺少大约1,000个条目。
我可以在您的代码中看到一些问题,
首先:
let promises = accounts.map(acc => async () => await sqsClient.sendMessage({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(acc),
DelaySeconds: 10,
})
);
您正在滥用async / await
。请始终牢记await
将等到您的承诺得到解决,然后再继续下一个,在这种情况下,每当您映射阵列promises
并调用每个功能项目时,它将等待该功能在继续之前所包裹的诺言,这是不好的。由于您只想回来诺言,因此您可以简单地做到这一点:
const promises = accounts.map(acc => () => sqsClient.sendMessage({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(acc),
DelaySeconds: 10,
})
);
现在,在第二部分中,您的eachLimit
实现看起来错误且非常冗长,我在ES6-Promise-Pool的帮助下对其进行了重新分配,以处理您的并发限制:
const PromisePool = require('es6-promise-pool')
function eachLimit(promiseFuncs, limit) {
const promiseProducer = function () {
while(promiseFuncs.length) {
const promiseFunc = promiseFuncs.shift();
return promiseFunc();
}
return null;
}
const pool = new PromisePool(promiseProducer, limit)
const poolPromise = pool.start();
return poolPromise;
}
最后但非常重要的是,请查看SQS限制,SQS FIFO最多可发送300 sends/sec。由于您正在处理5K项目,因此您可能会将并发限制限制到5k/(300 50(,约15。50可能是任何正数,只是稍微远离极限。另外,考虑使用SendMessageBatch,您可能会有更多的吞吐量并达到3K sends/sec
edit
正如我上面建议的,使用 sendMessageBatch
吞吐量要好得多,因此我重构了代码映射您支持sendMessageBatch
的承诺:
function chunkArray(myArray, chunk_size){
var index = 0;
var arrayLength = myArray.length;
var tempArray = [];
for (index = 0; index < arrayLength; index += chunk_size) {
myChunk = myArray.slice(index, index+chunk_size);
tempArray.push(myChunk);
}
return tempArray;
}
const groupedAccounts = chunkArray(accounts, 10);
const promiseFuncs = groupedAccounts.map(accountsGroup => {
const messages = accountsGroup.map((acc,i) => {
return {
Id: `pos_${i}`,
MessageBody: JSON.stringify(acc),
DelaySeconds: 10
}
});
return () => sqsClient.sendMessageBatch({
Entries: messages,
QueueUrl: queueUrl
})
});
然后您可以照常调用eachLimit
:
const result = await eachLimit(promiseFuncs, 3);
现在的差异是,每个处理的承诺都会发送一批尺寸n的消息(上面的示例10(。