我想重新分区/合并我的数据,以便将其保存到每个分区的一个Parquet文件中。 我还想使用 Spark SQL partitionBy API。 所以我可以这样做:
df.coalesce(1)
.write
.partitionBy("entity", "year", "month", "day", "status")
.mode(SaveMode.Append)
.parquet(s"$location")
我已经对此进行了测试,它似乎表现不佳。 这是因为数据集中只有一个分区需要处理,文件的所有分区、压缩和保存都必须由一个 CPU 内核完成。
我可以重写它以在调用 coalesce 之前手动进行分区(例如使用具有不同分区值的过滤器)。
但是有没有更好的方法来使用标准的Spark SQL API来做到这一点?
我遇到了完全相同的问题,我找到了一种使用 DataFrame.repartition()
来做到这一点的方法。使用 coalesce(1)
的问题在于并行度下降到 1,最好的情况可能是慢的,最坏的情况是错误。增加这个数字也无济于事 - 如果你这样做coalesce(10)
你会得到更多的并行性,但最终每个分区有 10 个文件。
要在不使用 coalesce()
的情况下获取每个分区一个文件,请使用具有您希望输出分区的相同列的repartition()
。因此,在您的情况下,请执行以下操作:
import spark.implicits._
df
.repartition($"entity", $"year", $"month", $"day", $"status")
.write
.partitionBy("entity", "year", "month", "day", "status")
.mode(SaveMode.Append)
.parquet(s"$location")
一旦我这样做,我就会得到每个输出分区一个镶木地板文件,而不是多个文件。
我在Python中对此进行了测试,但我认为在Scala中它应该是相同的。
根据定义:
coalesce(numPartitions: Int): DataFrame 返回具有正好 numPartitions 分区的新数据帧。
您可以使用它通过 numPartitions 参数减少 RDD/数据帧中的分区数。这对于在筛选大型数据集后更有效地运行操作非常有用。
关于您的代码,它表现不佳,因为您实际正在做的是:
- 将所有内容放入 1 个分区
中,这会使驱动程序过载,因为它将所有数据拉入驱动程序上的 1 个分区(而且这不是一个好的做法)
coalesce
实际上会洗牌网络上的所有数据,这也可能导致性能损失。
随机播放是 Spark 用于重新分发数据的机制,以便跨分区进行不同的分组。这通常涉及跨执行程序和计算机复制数据,使随机操作变得复杂且成本高昂。
随机洗牌概念对于管理和理解非常重要。始终最好尽可能随机,因为它涉及磁盘 I/O、数据序列化和网络 I/O,因此这是一项成本高昂的操作。为了组织随机播放的数据,Spark 会生成一组任务 - 映射任务以组织数据,以及一组归约任务以聚合数据。这个命名法来自MapReduce,与Spark的map和reduce操作没有直接关系。
在内部,各个地图任务的结果将保留在内存中,直到它们无法容纳为止。然后,根据目标分区对这些进行排序并写入单个文件。在归约端,任务读取相关的排序块。
关于分区拼接复合地板,我建议您阅读有关带镶木地板分区的 Spark DataFrame 的答案,以及性能调优的 Spark 编程指南中的这一部分。
我希望这有帮助!
@mortada的解决方案之上的太多内容,但这里有一个小抽象,可以确保您使用相同的分区来重新分区和写入,并演示排序:
def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
start = datetime.now()
(df.repartition(*partitions)
.sortWithinPartitions(*sort_within_partitions)
.write.partitionBy(*partitions)
# TODO: Format of your choosing here
.mode(SaveMode.Append).parquet(path)
# or, e.g.:
#.option("compression", "gzip").option("header", "true").mode("overwrite").csv(path)
)
print(f"Wrote data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
f"n {path}n Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
用法:
one_file_per_partition(df, location, ["entity", "year", "month", "day", "status"])