将文件从一个镶木地板分区移动到另一个分区



我的S3存储桶中有大量数据,由两列MODULEDATE分区这样我的拼花的文件结构是:

s3://my_bucket/path/file.parquet/MODULE='XYZ'/DATE=2020-01-01

我有7个MODULE,而DATE的范围从2020-01-012020-09-01。我发现数据不一致,需要更正其中一个模块的MODULE条目。基本上,我需要将属于MODULEXYZ的特定索引号的所有数据更改为MODULEABC。我可以在pyspark中通过加载数据帧并执行以下操作来做到这一点:

df=df.withColumn('MODULE', when(col('index')==34, "ABC").otherwise(col('MODULE')))

但是,我如何对其进行重新分区,以便只有那些更改的条目才能移动到ABC MODULE分区?如果我做了这样的事情:

df.mode('append').partitionBy('MODULE','DATE').parquet(s3://my_bucket/path/file.parquet")

我会将数据与错误的MODULE数据一起添加。此外,我有将近一年的数据,不想重新划分整个数据集,因为这需要很长时间。

有办法做到这一点吗?

如果我理解得很好,您在分区MODULE=XYZ中有数据,应该移动到MODULE=ABC

首先,识别受影响的文件。

from pyspark.sql import functions as F
file_list = df.where(F.col("index") == 34).select(
F.input_file_name()
).distinct().collect()

然后,您只基于这些文件创建一个数据帧,并使用它来完成两个MODULE

df = spark.read.parquet(file_list).withColumn(
"MODULE", when(col("index") == 34, "ABC").otherwise(col("MODULE"))
)
df.write.parquet(
"s3://my_bucket/path/ABC/", mode="append", partitionBy=["MODULE", "DATE"]
)

在这一点上,ABC应该是可以的(您只是添加了缺失的数据(,但XYZ应该是错误的,因为数据重复。要恢复XYZ,您只需要删除file_list中的文件列表。

IIUC您可以通过过滤特定索引的数据来实现这一点,然后将数据与日期一起保存为分区。

df=df.withColumn('MODULE', when(col('index')==34, "ABC").otherwise(col('MODULE')))
df = df.filter(col('index')==34)
df.mode('overwrite').partitionBy('DATE').parquet(s3://my_bucket/path/ABC/")

这样,您最终只会修改更改后的模块,即ABC

相关内容

最新更新