对队列的工作线程进行速率限制(例如:SQS)



每天,我都会运行一个CRON任务,该任务在SQS队列中填充了许多需要完成的任务。因此(例如)每天早上 9 点,空队列将收到 ~100 条需要处理的消息。

我希望每秒旋转一个新工人,直到队列为空。如果任何任务失败,则会将其放在队列的后面以重新运行。

例如,如果每个任务最多需要 1.5 秒才能完成:

    1
  • 秒后,1 个工作人员将启动消息 A
  • 2 秒后,1 个工作线程可能仍在运行消息 A,1 个工作线程将开始运行消息 B
  • 100 秒后,1 个工作线程可能仍在运行消息 XX,1 个工作线程将拾取消息 B,因为它之前失败
  • 101 秒后,直到第二天早上才传播更多的工作线程

有没有办法在 AWS lambda 中配置这种类型的基础设施?

一种方法,尽管我不相信它是最佳的:

由 CloudWatch 事件触发的 lambda(例如每秒或每 10 秒,具体取决于您的速率限制)。轮询 SQS 以接收(最多)N 条消息,然后它与每条消息"扇出"到另一个 Lambda 函数。


一些伪代码:

# Lambda 1 (schedule by CloudWatch Event / e.g. CRON)
def handle_cron(event, context):
    # in order to get more messages, we might have to receive several times (loop)
    for message in queue.receive_messages(MaxNumberOfMessages=10):
        # Note: the Event InvocationType so we don't want to wait for the response!
        lambda_client.invoke(FunctionName="foo", Payload=message.body, InvocationType='Event')

# Lambda 2 (triggered only by the invoke in Lambda 1)
def handle_message(event, context):
    # handle message
    pass
<</div> div class="one_answers">在我看来

,您最好将消息发布到 SNS,而不是 SQS,然后让您的 lambda 函数订阅 SNS 主题。

让 Lambda 担心它需要启动多少个"实例"才能响应负载。

这是一篇关于这种方法的博客文章,但谷歌可能会帮助你找到一个更接近你的实际用例的文章。

https://aws.amazon.com/blogs/mobile/invoking-aws-lambda-functions-via-amazon-sns/

为什么不直接有一个 Lambda 函数,它在上午 9 点开始轮询 sqs,一次获取一条消息并在每条消息之间休眠一秒钟?死信队列可以处理重试。在 x 秒后未收到来自 SQS 的消息后停止执行。

这是一种独特的情况,您实际上不需要并行处理。

相关内容

最新更新