如何使用AWS Glue作业覆盖过时的分区数据



我有每天转储一次的数据到s3://<bucket>mydata/year=*/month=*/*.snappy.parquet作为当月的累积数据。我有一个爬网程序来爬网它以更新mydata表,还有一个CW规则,它在爬网程序成功时调用lambda,启动Glue作业来转换列并输出到s3://<bucket>mydata transformed/year=*/month=*/*.snappy.parquet。这个流程基本上有效。然而,我目前面临的问题是,输出数据被添加写入,而不是替换现有数据(因为它是当月的累积数据(。例如,假设在2020年10月12日午夜,10/1的数据被转储到s3://<bucket>mydata/年=2020/月=10/*.snappy.parquet。流将在s3://<bucket>mydata transformed/year=2020/month=10/*.snappy.parquet,对于10/1数据来说一切都很好。然而,第二天,当10/1和10/2的数据被转储到s3://<bucket>mydata/year=2020/month=10/*.snappy.parquet(覆盖前一天的文件(,Glue作业将在输出文件夹中生成附加数据,即它将包含昨天运行的数据和今天运行的数据(因此10/1数据两次,10/2数据(。第二天,它将是10/1数据3X、10/2数据2X和10/3数据。等等。2020/09年及之前的数据还可以,因为它们没有变化。以下是我的代码的基本结构,去掉了锅炉板代码,用人工代码代替了真实的转换。

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
Transform0 = DynamicFrame.fromDF(ds_df1, glueContext, "Transform0")
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://<bucket>/mydata-transformed/", "partitionKeys": ["year","month"]}, transformation_ctx = "DataSink0")
job.commit()

我该怎么做才能使当前月份的前一天的数据被删除并替换为当前作业的数据?在我的例子中,有没有办法知道源数据中的month=10分区已经更改,因此我可以在进行转换和输出新数据之前清除输出中的同一分区?

谢谢。

[编辑]因此,一种解决方案似乎是获取作业书签,然后使用CURR_LATEST_PARTITIONS值来确定在处理数据之前应该删除哪个分区。在我的情况下,当我处理2020/10时,CURR_LATEST_PARTITIONS是2020/09。所以我知道删除2020/10的数据,因为如果CURR_LATEST_PARTITIONS是2020/09,那么这必须是正在处理的数据。我真的不喜欢这个解决方案,但我认为它会起作用,我不确定我还能做什么。

您有几个选项:

  1. DynamicFrameWriter还不支持覆盖S3中的数据。相反,您可以使用Spark本机write()。然而,对于真正大的数据集,它可能有点低效,因为S3中将使用单个工作程序来覆盖现有数据。示例如下:
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
ds_df1 
.write.mode('overwrite') 
.format('parquet') 
.partitionBy('year', 'month') 
.save('s3://<bucket>/mydata-transformed/')

job.commit()
  • 在lambda函数中,可以使用S3中某个前缀下的delete数据。使用Python和boto3的示例是:
  • import boto3
    
    s3_res = boto3.resource('s3')
    bucket = 'my-bucket-name'
    # Add any logic to derive required prefix based on year/month/day
    prefix = 'mydata/year=2020/month=10/'
    s3_res.Bucket(bucket).objects.filter(Prefix=key).delete()
    
    1. 您可以使用Glue的purge_s3_path从某个前缀中删除数据。链接在这里

    现在在glue中存在一个函数,用于删除S3路径或删除glue目录表。

    AWS GLue文档

    您可以使用purge_s3_path。

    请注意,它不会开箱即用,因为默认情况下"保留期"为7天,这意味着purge_s3_path不会删除任何超过168小时的内容。因此,如果你想像下面这样删除路径,你需要将保留期指定为零:

    glueContext.purge_s3_path('s3://s3_path/bucket', options={"retentionPeriod":0})
    

    最新更新