从SQS中读取没有并发的AWS Lambda



我的要求是这样的。

  1. 每2小时从SQS读取一次,获取所有可用的消息,然后进行处理。
  2. 处理包括从SQS消息中创建一个文件并将其发送到sftp服务器。

我实现了一个AWS Lambda来实现第1点。我有一个有sqs触发器的。我将批大小设置为50,然后将批窗口设置为2小时。我的假设是Lambda将每2小时触发一次,并且50条消息将一次性传递给Lambda函数,并且我将为每50条记录创建一个文件。

但是我观察到我的lambda函数被不同数量的消息触发(有时50有时20,有时5等),即使我已将批处理大小配置为50。
在阅读了一些文档后,我知道(我不确定)有5个长轮询连接,lambda生成从SQS读取,这导致lambda函数的这种行为被不同数量的消息触发。

我的问题是

  1. 我对5个并行连接的假设是正确的吗?如果有,我有办法控制吗?我希望这发生在一个线程/连接
  2. 如果1是不可能的,我这里有什么其他的选择。我不想为每几个记录创建一个文件。我希望每两个小时生成一个文件,其中包含sqs中的所有消息。

A "for Lambda是通过所谓的事件源映射集成实现的,它代表您从队列中轮询、批处理和删除消息。它是为连续轮询而设计的,不过您可以禁用它。您可以设置最大批量大小,一个函数接收的记录最多为10,000条(BatchSize)和最大300s长的轮询时间(MaximumBatchingWindowInSeconds)。那不符合你每两小时一次的要求。

两种选择:

  1. 删除事件源映射。相反,可以使用EventBridge规则每两小时触发一次Lambda。Lambda负责SQS ReceiveMessage和DeleteMessageBatch操作。这种方法可以确保Lambda在每个cron事件中只被调用一次。
  2. 保留事件源映射。在消息到达时处理消息,在S3中积累部分结果。每两个小时运行一次eventbridge触发的第二个Lambda,它将来自S3的部分结果捆绑在一起,并将它们发送到SFTP服务器。你不能控制Lambda调用的次数。

缩放说明:


<Edit(2023年1月中旬):AWS>

AWS Lambda现在支持为Amazon SQS事件源设置最大并发性,这是一种比保留并发性更直接、更简便的控制并发性的方法。最大并发性设置限制了Amazon SQS事件源可以调用的函数的并发实例的数量。有效范围是2-1000个并发实例。

创建和更新事件源映射api现在有一个SQS的ScalingConfig选项:

aws lambda update-event-source-mapping 
--uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" 
--scaling-config '{"MaximumConcurrency":2}' # valid range is 2-1000

& lt;/Edit>


使用SQS事件源映射集成,您可以调整批处理设置,但最终Lambda服务负责Lambda缩放。正如AWS博客理解AWS Lambda如何使用Amazon SQS标准队列进行扩展所述:

Lambda以批方式使用消息,从五个并发批开始,每次使用五个函数。如果队列中有更多消息,Lambda每分钟最多添加60个函数,最多1,000个函数,以消耗这些消息。

理论上,您可以使用保留并发性来限制并发Lambda执行的数量,但是您可能会由于节流错误而冒丢失消息的风险。

  1. 您可以尝试将该函数的ReservedConcurrency设置为1。这可能会有所帮助。参考文档

  2. 一个简单的解决方案是创建一个CloudWatch事件触发器(类似于Cronjob),每两个小时触发一次Lambda函数。在Lambda函数中,您在Queue上调用ReceiveMessage,直到获得所有消息,处理它们,然后从Queue中删除它们。缺点是可能有太多的消息要在15分钟内处理,所以这是你必须管理的事情。

最新更新