我正在使用pyspark来处理一些数据并将输出写入S3。我在雅典娜中创建了一个表,该表将用于查询此数据。
数据采用 json 字符串(每行一个(的形式,Spark 代码读取文件,根据某些字段对其进行分区并写入 S3。
对于 1.1 GB 的文件,我看到 Spark 正在写入 36 个文件,每个文件大小大约 5 MB。 阅读雅典娜文档时,我看到最佳文件大小为 ~128 MB . https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-tips-for-amazon-athena/
sparkSess = SparkSession.builder
.appName("testApp")
.config("spark.debug.maxToStringFields", "1000")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.getOrCreate()
sparkCtx = sparkSess.sparkContext
deltaRdd = sparkCtx.textFile(filePath)
df = sparkSess.createDataFrame(deltaRdd, schema)
try:
df.write.partitionBy('field1','field2','field3')
.json(path, mode='overwrite', compression=compression)
except Exception as e:
print (e)
为什么Spark要写这么小的文件。有没有办法控制文件大小。
有什么方法可以控制文件大小吗?
有一些控制机制。但是,它们并不明确。
s3驱动程序不是Spark本身的一部分。它们是带有火花EMR的Hadoop装置的一部分。 s3 块大小可以在/etc/hadoop/core-site.xml
配置文件。
但是默认情况下,它应该在 128 MB 左右。
为什么Spark要写入这么小的文件
Spark将遵循Hadoop块的大小。但是,您可以在编写之前使用partionBy
。
假设您使用partionBy("date").write.csv("s3://products/")
. Spark 将创建一个子文件夹,其中包含每个分区的date
。在 每个分区文件夹 Spark 将再次尝试创建块并尝试遵守fs.s3a.block.size
。
例如
s3:/products/date=20191127/00000.csv
s3:/products/date=20191127/00001.csv
s3:/products/date=20200101/00000.csv
在上面的例子中 - 一个特定的分区可以小于128mb的块大小。
因此,只需仔细检查您的块大小,/etc/hadoop/core-site.xml
是否需要在写入之前使用partitionBy
对数据框进行分区。
编辑:
类似的帖子还建议重新分区数据帧以匹配partitionBy
方案
df.repartition('field1','field2','field3')
.write.partitionBy('field1','field2','field3')
writer.partitionBy
对现有数据帧分区进行操作。它不会repartition
原始数据帧。因此,如果整个数据帧的分区方式不同,则会发生嵌套分区。