数据帧分区由单个 Parquet 文件(每个分区)



我想重新分区/合并我的数据,以便将其保存到每个分区的一个Parquet文件中。 我还想使用 Spark SQL partitionBy API。 所以我可以这样做:

df.coalesce(1)
    .write
    .partitionBy("entity", "year", "month", "day", "status")
    .mode(SaveMode.Append)
    .parquet(s"$location")

我已经对此进行了测试,它似乎表现不佳。 这是因为数据集中只有一个分区需要处理,文件的所有分区、压缩和保存都必须由一个 CPU 内核完成。

我可以重写它以在调用 coalesce 之前手动进行分区(例如使用具有不同分区值的过滤器)。

但是有没有更好的方法来使用标准的Spark SQL API来做到这一点?

我遇到了完全相同的问题,我找到了一种使用 DataFrame.repartition() 来做到这一点的方法。使用 coalesce(1) 的问题在于并行度下降到 1,最好的情况可能是慢的,最坏的情况是错误。增加这个数字也无济于事 - 如果你这样做coalesce(10)你会得到更多的并行性,但最终每个分区有 10 个文件。

要在不使用 coalesce() 的情况下获取每个分区一个文件,请使用具有您希望输出分区的相同列的repartition()。因此,在您的情况下,请执行以下操作:

import spark.implicits._
df
  .repartition($"entity", $"year", $"month", $"day", $"status")
  .write
  .partitionBy("entity", "year", "month", "day", "status")
  .mode(SaveMode.Append)
  .parquet(s"$location")

一旦我这样做,我就会得到每个输出分区一个镶木地板文件,而不是多个文件。

我在Python中对此进行了测试,但我认为在Scala中它应该是相同的。

根据定义:

coalesce(numPartitions: Int): DataFrame 返回具有正好 numPartitions 分区的新数据帧。

您可以使用它通过 numPartitions 参数减少 RDD/数据帧中的分区数。这对于在筛选大型数据集后更有效地运行操作非常有用。

关于您的代码,它表现不佳,因为您实际正在做的是:

    将所有内容放入 1 个分区
  1. 中,这会使驱动程序过载,因为它将所有数据拉入驱动程序上的 1 个分区(而且这不是一个好的做法)

  2. coalesce实际上会洗牌网络上的所有数据,这也可能导致性能损失。

随机播放是 Spark 用于重新分发数据的机制,以便跨分区进行不同的分组。这通常涉及跨执行程序和计算机复制数据,使随机操作变得复杂且成本高昂。

随机洗牌概念对于管理和理解非常重要。始终最好尽可能随机,因为它涉及磁盘 I/O、数据序列化和网络 I/O,因此这是一项成本高昂的操作。为了组织随机播放的数据,Spark 会生成一组任务 - 映射任务以组织数据,以及一组归约任务以聚合数据。这个命名法来自MapReduce,与Spark的map和reduce操作没有直接关系。

内部,各个地图任务的结果将保留在内存中,直到它们无法容纳为止。然后,根据目标分区对这些进行排序并写入单个文件。在归约端,任务读取相关的排序块。

关于分区拼接复合地板,我建议您阅读有关带镶木地板分区的 Spark DataFrame 的答案,以及性能调优的 Spark 编程指南中的这一部分。

我希望这有帮助!

它不是

@mortada的解决方案之上的太多内容,但这里有一个小抽象,可以确保您使用相同的分区来重新分区和写入,并演示排序:

  def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
    start = datetime.now()
    (df.repartition(*partitions)
      .sortWithinPartitions(*sort_within_partitions)
      .write.partitionBy(*partitions)
      # TODO: Format of your choosing here
      .mode(SaveMode.Append).parquet(path)
      # or, e.g.:
      #.option("compression", "gzip").option("header", "true").mode("overwrite").csv(path)
    )
    print(f"Wrote data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
        f"n  {path}n  Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")

用法:

one_file_per_partition(df, location, ["entity", "year", "month", "day", "status"])

相关内容

  • 没有找到相关文章

最新更新