如何控制在spark中设置大分区时生成的文件数量



由于输入数据量大,我设置了spark(spark.sql.shuffle.partitions=1000(的大的shuffle分区。然而,输出文件很小(~1GB(,但它会创建许多小文件(3000个文件,每个文件都小于1Mb(。如何将这些小文件合并为一个大文件?

另一个问题是,为什么输出文件的数量是shuffle分区数量的3倍?

根据Spark文档,spark.sql.shuffle.partitions参数Configures the number of partitions to use when shuffling data for joins or aggregations.。要控制输出文件的数量,请在写入输出之前使用repartition()方法。这样的东西:

df
.filter(...)  // some transformations
.join(...)
.repartition(1)  // move data into a single partition
.write
.format(...)
.save(...)

上面的代码片段将生成一个输出文件。

您不局限于一次重新划分数据-您可以根据需要进行多次重新划分,但请记住,这是一项成本高昂的操作:

df
.filter(...)  // some transformations
.repartition(...)  // repartition to improve join performance
.join(...)
.repartition(1)  // move data into a single partition
.write
.format(...)
.save(...)

如果你想很好地解释repartition是如何工作的,这里有一个很好的答案:Spark-重新分区((与合并((

有关如何提高联接性能的更多信息,请参阅Spark文档:https://spark.apache.org/docs/latest/sql-performance-tuning.html#join-sql查询的策略提示

因为您有大量的分区。你可能需要在约会框架上达成一致。聚结将减少分区的数量。

val df_res = df.coalesce(10)

这样可以将输出文件的数量从1000个减少到10个。或者您可以coalesce(1)来创建一个大文件。

Coalesce使用现有的分区并最大限度地减少混乱的数据。结果可能大小不同。

输出文件的数量等于分区的数量。当对用于联接或聚合的数据进行混洗时使用属性(spark.sql.shuffle.partitions(。

您可以对数据帧执行df.repartition()以增加/减少分区。

最新更新