pyspark 在输出中写入许多较小的文件



我正在使用pyspark来处理一些数据并将输出写入S3。我在雅典娜中创建了一个表,该表将用于查询此数据。

数据采用 json 字符串(每行一个(的形式,Spark 代码读取文件,根据某些字段对其进行分区并写入 S3。

对于 1.1 GB 的文件,我看到 Spark 正在写入 36 个文件,每个文件大小大约 5 MB。 阅读雅典娜文档时,我看到最佳文件大小为 ~128 MB . https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-tips-for-amazon-athena/

sparkSess = SparkSession.builder
.appName("testApp")
.config("spark.debug.maxToStringFields", "1000")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.getOrCreate()
sparkCtx = sparkSess.sparkContext
deltaRdd = sparkCtx.textFile(filePath)
df = sparkSess.createDataFrame(deltaRdd, schema)
try:
df.write.partitionBy('field1','field2','field3')
.json(path, mode='overwrite', compression=compression)
except Exception as e:
print (e)

为什么Spark要写这么小的文件。有没有办法控制文件大小。

有什么方法可以控制文件大小吗?

有一些控制机制。但是,它们并不明确。

s3驱动程序不是Spark本身的一部分。它们是带有火花EMR的Hadoop装置的一部分。 s3 块大小可以在/etc/hadoop/core-site.xml配置文件。

但是默认情况下,它应该在 128 MB 左右。

为什么Spark要写入这么小的文件

Spark将遵循Hadoop块的大小。但是,您可以在编写之前使用partionBy

假设您使用partionBy("date").write.csv("s3://products/"). Spark 将创建一个子文件夹,其中包含每个分区的date。在 每个分区文件夹 Spark 将再次尝试创建块并尝试遵守fs.s3a.block.size

例如

s3:/products/date=20191127/00000.csv
s3:/products/date=20191127/00001.csv
s3:/products/date=20200101/00000.csv

在上面的例子中 - 一个特定的分区可以小于128mb的块大小。

因此,只需仔细检查您的块大小,/etc/hadoop/core-site.xml是否需要在写入之前使用partitionBy对数据框进行分区。

编辑:

类似的帖子还建议重新分区数据帧以匹配partitionBy方案

df.repartition('field1','field2','field3')
.write.partitionBy('field1','field2','field3')

writer.partitionBy对现有数据帧分区进行操作。它不会repartition原始数据帧。因此,如果整个数据帧的分区方式不同,则会发生嵌套分区。

最新更新