执行5000 承诺AWS SQS的AWS lambda功能极为不可靠



我正在写一个节点AWS lambda函数,该功能从我的数据库中查询约5,000个项目,并通过消息将其发送到AWS SQS队列中。

我的本地环境涉及我与AWS Sam Local一起运行Lambda,并用GoAws模拟AWS SQ。

我的lambda的一个示例骨骼是:

async run() {
  try {
    const accounts = await this.getAccountsFromDB();
    const results = await this.writeAccountsIntoQueue(accounts);
    return 'I've written: ' + results + ' messages into SQS';
  } catch (e) {
    console.log('Caught error running job: ');
    console.log(e);
    return e;
  }
}

我的getAccountsFromDB()功能没有性能问题,它几乎可以立即运行,使我返回了5,000个帐户的数组。

我的writeAccountsIntoQueue功能看起来像:

async writeAccountsIntoQueue(accounts) {
  // Extract the sqsClient and queueUrl from the class 
  const { sqsClient, queueUrl } = this;
  try {
    // Create array of functions to concurrenctly call later
    let promises = accounts.map(acc => async () => await sqsClient.sendMessage({
        QueueUrl: queueUrl,
        MessageBody: JSON.stringify(acc),
        DelaySeconds: 10,
      })
    );
    // Invoke the functions concurrently, using helper function `eachLimit`
    let writtenMessages = await eachLimit(promises, 3);
    return writtenMessages;
  } catch (e) {
    console.log('Error writing accounts into queue');
    console.log(e);
    return e;
  }
}

我的助手,eachLimit看起来像:

async function eachLimit (funcs, limit) {
  let rest = funcs.slice(limit);
  await Promise.all(
    funcs.slice(0, limit).map(
      async (func) => {
        await func();
        while (rest.length) {
          await rest.shift()();
        }
      }
    )
  );
}

,就我的理解,它应该将并发执行限制为limit

此外,我已经包装了AWS SDK SQS客户端,以返回具有看起来像:

sendMessage函数的对象
sendMessage(params) {
  const { client } = this;
  return new Promise((resolve, reject) => {
    client.sendMessage(params, (err, data) => {
      if (err) {
        console.log('Error sending message');
        console.log(err);
        return reject(err);
      }
      return resolve(data);
    });
  });
}

所以没有任何幻想,只是诺言。

我已经设置了300秒后的lambda,lambda总是会出来,如果不这样做,它突然结束了,错过了一些应该继续进行的最终记录,这使我感到不安在某个地方默默地错误。当我检查SQS队列时,我缺少大约1,000个条目。

我可以在您的代码中看到一些问题,

首先:

    let promises = accounts.map(acc => async () => await sqsClient.sendMessage({
        QueueUrl: queueUrl,
        MessageBody: JSON.stringify(acc),
        DelaySeconds: 10,
      })
    );

您正在滥用async / await。请始终牢记await将等到您的承诺得到解决,然后再继续下一个,在这种情况下,每当您映射阵列promises并调用每个功能项目时,它将等待该功能在继续之前所包裹的诺言,这是不好的。由于您只想回来诺言,因此您可以简单地做到这一点:

const promises = accounts.map(acc => () => sqsClient.sendMessage({
       QueueUrl: queueUrl,
       MessageBody: JSON.stringify(acc),
       DelaySeconds: 10,
    })
);

现在,在第二部分中,您的eachLimit实现看起来错误且非常冗长,我在ES6-Promise-Pool的帮助下对其进行了重新分配,以处理您的并发限制:

const PromisePool = require('es6-promise-pool')
function eachLimit(promiseFuncs, limit) {    
    const promiseProducer = function () {
        while(promiseFuncs.length) {
            const promiseFunc = promiseFuncs.shift();
            return promiseFunc();
        }
        return null;
    }
    const pool = new PromisePool(promiseProducer, limit)
    const poolPromise = pool.start();
    return poolPromise;
}

最后但非常重要的是,请查看SQS限制,SQS FIFO最多可发送300 sends/sec。由于您正在处理5K项目,因此您可能会将并发限制限制到5k/(300 50(,约15。50可能是任何正数,只是稍微远离极限。另外,考虑使用SendMessageBatch,您可能会有更多的吞吐量并达到3K sends/sec

edit

正如我上面建议的,使用 sendMessageBatch吞吐量要好得多,因此我重构了代码映射您支持sendMessageBatch的承诺:

function chunkArray(myArray, chunk_size){
    var index = 0;
    var arrayLength = myArray.length;
    var tempArray = [];
    for (index = 0; index < arrayLength; index += chunk_size) {
        myChunk = myArray.slice(index, index+chunk_size);
        tempArray.push(myChunk);
    }
    return tempArray;
}
const groupedAccounts = chunkArray(accounts, 10);
const promiseFuncs = groupedAccounts.map(accountsGroup => {
    const messages = accountsGroup.map((acc,i) => {
        return {
            Id: `pos_${i}`,
            MessageBody: JSON.stringify(acc),
            DelaySeconds: 10
        }
    });
    return () => sqsClient.sendMessageBatch({
        Entries: messages,
        QueueUrl: queueUrl
     })
});

然后您可以照常调用eachLimit

const result = await eachLimit(promiseFuncs, 3);

现在的差异是,每个处理的承诺都会发送一批尺寸n的消息(上面的示例10(。

最新更新