通过AWS Lambda重新处理数百万个S3文件



我正在努力为以下用例设置。我在S3桶中可能有数百万个文件,以天为单位。我想把一定时期的所有数据的时间流基于时间的查询。不幸的是,我注意到EC2上的单线程处理(我只是迭代文件并将它们批量发送到Timestream)不能很好地工作。一天摄入大约需要24小时。所以我尝试使用AWS Lambda处理作为替代方案。我创建了一个临时存储桶,用于从主存储桶同步一天的数据。每个文件用S3通知触发我的Lambda。这是相当整洁,允许缩放到无法达到的大小,但是!AWS Lambda的默认并发配额是1000。如果新传入的消息被排队,我会很好,但它们只是被丢弃。最重要的是,每个文件(.orc)甚至包含90k条记录。我注意到Timestream boto3客户端相当慢,保存100条记录批处理平均需要100-150ms。所以你自己算算……每个lambda执行最多需要3分钟!最重要的是!我还注意到,一些保存需要超过一秒钟(我假设时间流客户端节流或什么的),所以一些lambda在3分钟后超时。最后,我成功地在一次运行中获得了大约1/3 - 1/2的每日数据。

但是很快……所以我现在想要实现的是,找到一种更可持续的方式来吸收这些数据。kineesis允许多达8或10个并行因子(基于分片数)-不是很好。我想一直在100-200-500辆左右运行。因此,我需要一种将S3通知排队的方法,并以一次数百个的速度使用它们。另外,也许timestream应该表现的更好,我做错了什么?我的初始化代码:

timestream = boto3.client('timestream-write',
config=Config(read_timeout=5, max_pool_connections=5000, retries={'max_attempts': 10}))

哦,顺便说一句,我昨天注意到timestream有点奇怪。当我一次又一次地触发对同一个文件的处理时,它没有拒绝记录,而是默默地忽略它们,以200响应。不可思议的东西。

无论如何,感谢任何帮助,因为我已经无计可施了。

在s3中获取每个新文件时触发SQS,SQS触发λ和处理这些文件发送到λ在你认为合适的地方。SQS是如何工作的?SQS正在向lambda发送事件JSON,该文件不会从SQS队列中删除,而是移动到不可见队列中,当lambda成功完成后,消息(触发SQS的文件)将从SQS中删除,如果lambda失败,则SQS将再次发送消息给lambda。这里有更多关于SQS可见性的信息https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html

最新更新