用修改后的PySpark DataFrame覆盖现有的Parquet数据集



用例是将一列附加到Parquet数据集,然后在同一位置高效地重写。下面是一个简单的例子:

创建一个pandasDataFrame并写入一个分区的Parquet数据集

import pandas as pd
df = pd.DataFrame({
'id': ['a','a','a','b','b','b','b','c','c'],
'value': [0,1,2,3,4,5,6,7,8]})
path = r'c:/data.parquet'
df.to_parquet(path=path, engine='pyarrow', compression='snappy', index=False, partition_cols=['id'], flavor='spark')

然后加载Parquet数据集作为pyspark视图,并创建一个修改后的数据集作为pysparkDataFrame。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.parquet(path).createTempView('data')
sf = spark.sql(f"""SELECT id, value, 0 AS segment FROM data""")

此时sf数据与df数据相同,但增加了全0的segment列。我想有效地覆盖现有的Parquet数据集在pathsf作为Parquet数据集在同一位置。下面是不工作的。也不建议将sf写入新位置,删除旧的Parquet数据集,然后重命名,因为这样看起来效率不高。

# saves existing data and new data
sf.write.partitionBy('id').mode('append').parquet(path)
# immediately deletes existing data then crashes
sf.write.partitionBy('id').mode('overwrite').parquet(path)

简而言之:你不应该:

大数据的一个原则(spark是为大数据服务的)是永远不要覆盖东西。当然,.mode('overwrite')是存在的,但这不是一个正确的用法。

我猜它为什么会失败:

  • 您添加了一个列,因此写入的数据集具有与当前存储的数据集不同的格式。这会造成模式混淆
  • 在处理时重写输入数据。所以spark读取一些行,处理它们并覆盖输入文件。但是这些文件仍然是其他行要处理的输入。

在这种情况下,我通常做的是创建另一个数据集,当没有理由保持旧的数据集时(即当处理完全完成时),清理它。要删除文件,您可以查看如何删除hdfs文件的文章。它应该适用于spark访问的所有文件。然而,它是在scala中,所以我不确定它是否可以适应pyspark。

注意,效率不是重写的好理由,它做的工作比简单写作。

最新更新