每天,我都会运行一个CRON任务,该任务在SQS队列中填充了许多需要完成的任务。因此(例如)每天早上 9 点,空队列将收到 ~100 条需要处理的消息。
我希望每秒旋转一个新工人,直到队列为空。如果任何任务失败,则会将其放在队列的后面以重新运行。
例如,如果每个任务最多需要 1.5 秒才能完成:
- 1
- 秒后,1 个工作人员将启动消息 A
- 2 秒后,1 个工作线程可能仍在运行消息 A,1 个工作线程将开始运行消息 B
- 100 秒后,1 个工作线程可能仍在运行消息 XX,1 个工作线程将拾取消息 B,因为它之前失败 了
- 101 秒后,直到第二天早上才传播更多的工作线程
有没有办法在 AWS lambda 中配置这种类型的基础设施?
一种方法,尽管我不相信它是最佳的:
由 CloudWatch 事件触发的 lambda(例如每秒或每 10 秒,具体取决于您的速率限制)。轮询 SQS 以接收(最多)N 条消息,然后它与每条消息"扇出"到另一个 Lambda 函数。
一些伪代码:
# Lambda 1 (schedule by CloudWatch Event / e.g. CRON)
def handle_cron(event, context):
# in order to get more messages, we might have to receive several times (loop)
for message in queue.receive_messages(MaxNumberOfMessages=10):
# Note: the Event InvocationType so we don't want to wait for the response!
lambda_client.invoke(FunctionName="foo", Payload=message.body, InvocationType='Event')
和
# Lambda 2 (triggered only by the invoke in Lambda 1)
def handle_message(event, context):
# handle message
pass
<</div>
div class="one_answers">在我看来,您最好将消息发布到 SNS,而不是 SQS,然后让您的 lambda 函数订阅 SNS 主题。
让 Lambda 担心它需要启动多少个"实例"才能响应负载。
这是一篇关于这种方法的博客文章,但谷歌可能会帮助你找到一个更接近你的实际用例的文章。
https://aws.amazon.com/blogs/mobile/invoking-aws-lambda-functions-via-amazon-sns/
为什么不直接有一个 Lambda 函数,它在上午 9 点开始轮询 sqs,一次获取一条消息并在每条消息之间休眠一秒钟?死信队列可以处理重试。在 x 秒后未收到来自 SQS 的消息后停止执行。
这是一种独特的情况,您实际上不需要并行处理。