我们有一个每分钟生成3000个文件的服务。文件大小小于5 KB。这些存储在azure中的blob存储中。我们需要连接这些文件并将其发送到S3。s3中的最终文件大小应该在10MB到100MB之间(这些数据通过Snowpipe传输到snowflake。(。如何以快速且经济高效的方式实现这一点。
添加更多信息:我已经尝试过的:
1( 正在向azure队列发送blob创建事件。将数据加载到S3的队列触发函数。然后使用aws-lambda连接(但lambda通常超时(
2( Python代码使用多处理,读取azure队列和blob,然后连接数据以创建一个10MB文件并将其发送到S3。尝试从azure网络作业运行此代码。(Webjob只有4个核心(。这还不够快,而且不可扩展。
我需要一个能够以最经济高效的方式并行运行任务并且可扩展的解决方案。它可以是一个批处理过程。S3中的数据延迟可以是24小时。(无法使用azure批处理,因为我们已经用完了订阅计划的其他进程的帐户数。(。
任何最适合这种情况的ETL工具或服务的建议。
脑海中浮现的一个创造性选项是创建AWS Lambda函数,该函数将执行以下几个步骤:
- 在S3中为输出对象生成名称
- 列出Blob(可能根据模式,根据您的具体情况而定(
- 遍历列表
- 下载blob
- 使用多部分上传将所有内容保存到对象中(建议使用smart_open-lib(
- [可选]删除所有源文件
现在您可以将此功能安排为每6-10分钟运行一次(取决于输入文件的大小(
该函数的代码应该类似于以下内容(尽管这不是经过测试的,只是从文档中编译的(
import datetime
import smart_open
from azure.storage.blob import BlockBlobService
from azure.storage.blob import BlobClient
_STORAGE_ACCOUNT_NAME = "<storage account name>"
_STORAGE_ACCOUNT_KEY = "<storage account key>"
_CONTAINER_NAME = "<container name>"
blob_service = BlockBlobService(
account_name=_STORAGE_ACCOUNT_NAME,
account_key=_STORAGE_ACCOUNT_KEY)
s3_path = f"s3://your-bucket/prefix/path/{datetime.now().strftime('%Y-%m-%d-%H-%M')}.gz"
def lambda_handler(a, b):
generator = blob_service.list_blobs(_CONTAINER_NAME)
with smart_open.open(s3_path, 'wb') as out:
for blob in generator:
blob_client = BlobClient.from_blob_url(
blob_url=f"https://account.blob.core.windows.net/{blob.container}/{blob.name}")
out.write(blob_client.download_blob().readall())
if __name__ == '__main__':
lambda_handler(None, None)