我已经在AWS中设置了ETL管道,如下所示
input_rawdata-> s3-> lambda->触发spark etl脚本(通过AWS胶) -> output(s3,parquet files)
我的问题是假设以上是数据的初始负载,我该如何设置每天(或每小时)添加新行或更新现有记录
的每天(或每小时)运行增量批次a。)如何继续附加到相同的S3 Parquet文件。因此,随后的Presto DB查询产生了最新数据。
b。)如何处理重复记录获取查询的最新时间戳。
在Spark脚本中,我是否需要使用source AS S3创建Hive外部表格,并用于Presto DB?
感谢您的任何输入。
apache hudi将是一个很好的工具:https://hudi.incubator.apache.org/您可以用parquet格式中存储在S3中的数据对表进行UPSERTS,PRESTO与其兼容。例如,使用EMR 5.28 HUDI已安装,您可以使用Hive,Spark和Presto查询Hudi数据集。
您可以在ETL工作中定义工作书签。
书签可以保留S3文件处理的跟踪,因此一旦处理了历史负载,并且如果您在S3上转储新文件,则只有ETL作业处理新文件,并将这些文件标记为内部处理。
。您可以处理这种方式增量数据。
首先不要试图附加到S3中存在的文件,而是用多个记录创建文件。
要查询S3中的分析您可以使用AWS ATHENA来描述数据,其中其数据目录是Hive Metastore兼容的。
要删除重复项,您可以通过achena编写SQL类似查询以查询唯一记录集。
由于您已经使用了lambda和胶水,因此可以使用kineis和kcl将数据捕获为流,或者使用Spark流,因为您拥有Spark脚本。这些选项中的任何一个都可以为您提供所需的增量输入。将实时流传输到现有数据库中时,数据损坏的风险较小。
然后,您可以用胶水传递数据。您可以将ETL工作安排或链接到胶水中,它可以将转换的数据加载到AWS存储桶中。胶水是面向批次的,但最小间隔为5分钟,并且Kinesis执行第一步,然后将完成的数据传递到胶水中,您仍然可以进行增量更新。您可以对此进行审查,以获取有关ETL体系结构的其他资源和想法。
对于任何重复数据,您可以在完成的数据集上运行SQL-ish查询。
您现在可以使用Delta使用Spark在数据上编造,附加和增量。该工具让您以" delta"格式(Spark Metadata文件)编写数据。您甚至可以恢复或查询数据到一个时间点。请注意,最近它没有与雅典娜/普雷斯托完全合作(开源),因为您需要创建清单(但要解决问题)。