Data Ingestion:从S3加载动态文件到Snowflake



情况:每个月有一个csv登陆AWS S3。供应商可以随心所欲地从文件中添加/删除/修改列。所以图式是事先不知道的。需求是在Snowflake中动态创建一个表,并将数据加载到表中。Matillion是我们的ELT工具。

这是我到目前为止所做的。

  1. 设置Lambda来检测文件的到达,将其转换为JSON,上传到另一个S3目录并将文件名添加到SQS。
  2. matilion检测SQS消息,并将带有JSON数据的文件加载到SF表的Variant列中。
  3. SF存储过程获取变量列,并根据JSON数据中的字段数生成一个表。SF中的VARIANT列只有在其为JSON数据时才以这种方式工作。CSV是不支持的。

这适用于10,000行。当我使用超过1GB的完整文件(超过10M行)运行此操作时,问题就出现了。它会在运行时以磁盘空间不足错误导致lambda作业崩溃。

这些是我目前想到的备选方案:

  1. 将一个EFS卷附加到lambda,并在上载到S3之前使用它来存储JSON文件。JSON数据文件比CSV文件大得多,我预计JSON文件大约有10-20GB,因为文件有超过10M行。
  2. matilion有一个Excel Query组件,它可以获取标题并动态创建表并加载文件。我想我可以将标题行从CSV转换为Lambda内的XLX文件,将其传递给Matillion,让它创建结构,然后在结构创建后加载CSV文件。

这里还有什么其他选项?考虑因素包括一个好的可重复设计模式,用于未来的大型csv或类似的需求,EFS的成本,我是否充分利用了我可用的工具?谢谢! !

为什么不将初始csv文件拆分为多个文件,然后以与当前相同的方式处理每个文件?

为什么要将CSV转换为JSON;CSV直接加载到表中,而不需要做任何数据转换,在JSON的情况下,横向扁平化将JSON转换为关系数据行;为什么不使用Snowpipe功能直接将数据加载到Snowflake中,而不使用Matallion。您可以在加载到Snowflake之前将大型csv文件分割成较小的块;这将有助于在顺丰仓库之间分配数据处理负载。

我还使用matilion将CSV文件从SFTP加载到Snowflake中,而不知道模式。

在我的过程中,我创建了一个"temp"表,包含50个VARCHAR列(我们的文件不应该超过50列)。我们的数据总是包含文本、日期或数字,所以VARCHAR不是问题。然后,我可以将.csv文件加载到临时表中。我相信这应该也适用于来自S3的文件。

这样至少可以把数据输入到雪花中。如何创造"最后"但是,考虑到你的情况,我不确定。我可以想象能够使用标题行,和/或对每列中包含的数据的"类型"进行一些分析,以确定所需的列类型。

但是如果你可以创建'final'表,你可以把数据从temp移过来,或者修改temp表本身。

这可以使用外部表来实现,其中外部表将被映射为单个列,分隔符将是一个新的行字符。外部表还有一个特殊的虚拟列,可以对其进行处理,以便动态提取所有列,然后使用存储过程在任何给定时间根据列的数量创建一个表。有一个有趣的视频讨论了雪花的这个限制(https://youtu.be/hRNu58E6Kmg)

最新更新