s3 bucket在通过pyspark glue job写parquet文件时被删除



我开发了一个Pyspark Glue作业来加载complete/incremental数据集。它工作得很好。加载数据集后,我必须执行几个aggregations并将其写入"overwrite"/"append"模式在单个位置。为此,我编写了以下代码:

maxDateValuePath = "s3://...../maxValue/"
outputPath = "s3://..../complete-load/"
aggregatedPath = "s3://...../aggregated-output/"
fullLoad = ""
aggregatedView = ""
completeAggregatedPath = "s3://...../aggregated-output/step=complete-load/"
incrAggregatedPath = "s3://....../aggregated-output/step=incremental-load/"

aggregatedView=""
data.createOrReplaceTempView("data")
aggregatedView = spark.sql("""
select catid,count(*) as number_of_catids from data 
group by catid""")
if (incrementalLoad == str(0)):
aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
aggregatedView.write.mode("overwrite").parquet(completeAggregatedPath)
elif (incrementalLoad == str(1)):
aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
log.info("step 123: " + str(aggregatedView.count()))
aggregatedView.write.mode("append").parquet(completeAggregatedPath)
aggregatedView = spark.read.parquet(completeAggregatedPath)
log.info("step 126: " + str(aggregatedView.count()))
w = Window.partitionBy("catid").orderBy(col("created_at").desc())
aggregatedView = aggregatedView.withColumn("rw", row_number().over(w)).filter(col("rw") == lit(1)).drop(
"rw")
log.info("step 130: " + str(aggregatedView.count()))
log.info(aggregatedView.orderBy(col("created_at").desc()).show())
print("::::::::::::before writing::::::::::::::")
aggregatedView.write.mode("overwrite").parquet(incrAggregatedPath)

其中01分别代表满载和增量。现在,在写入转换后的数据集之前,我添加了一个created_at列,用于处理写入增量数据集后的最新聚合记录,否则会导致重复。

一切按预期工作正常,但问题是当我试着写数据集在覆盖模式下使用这条线aggregatedView.write.mode("overwrite").parquet(aggregatedPath)增量部分,水桶被删除在s3中,此操作会导致以下error:

Caused by: java.io.FileNotFoundException: File not present on S3
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

为什么桶被删除?

问题出在你的代码中。您正在读取和写入增量部分中的相同位置。

aggregatedView = spark.read.parquet(aggregatedPath)
...
...
aggregatedView.write.mode("overwrite").parquet(aggregatedPath)  

由于spark执行延迟计算,因此当您指定模式为覆盖时,它会清除特定文件夹中的数据,从而使您没有任何可读的内容。当它到达代码的写部分时,它开始读取数据,此时您的数据已经被您的写操作清除。

因此,我通过更改下面的代码行来解决这个问题:

aggregatedView2 = spark.read.parquet(completeAggregatedPath)

所以对于聚合视图,将有一个df沿袭。由于读写是在相同的s3位置和相同的df沿袭上执行的,因此它正在删除前缀是因为df的源数据是不明确的。因此,创建了一个新的df,它将在其中查找S3位置,而不是之前的转换。

也许它会帮助别人!

最新更新