在火花 DF 中使用分区后是否可以进行重新分区



我问这个问题是因为如果我将重新分区指定为 5,那么我的所有数据(>200Gigs(都会移动到 5 个不同的执行器,并且 98% 的资源未使用。 然后分区发生,这再次产生了很多随机。有没有办法先发生分区,然后对数据运行重新分区?

尽管这个问题并不完全容易理解,但以下内容与其他答案一致,这种方法应避免不必要的洗牌中提到的问题:

val n = [... some calculation for number of partitions / executors based on cluster config and volume of data to process ...]
df.repartition(n, $"field_1", $"field_2", ...)
  .sortWithinPartitions("fieldx", "field_y")
  .write.partitionBy("field_1", "field_2", ...)
  .format("location")

其中 [field_1、field_2、...] 是用于重新分区和分区的同一组字段。

您可以使用

repartition(5, col("$colName")) .
因此,当您进行partitionBy("$colName")时,您将跳过'$colName'的随机播放,因为它已经被重新分区了。

还要考虑具有与执行程序数乘以 3 的已用内核数的乘积一样多的分区数(不过这可能在 2 到 4 之间变化(。
众所周知,Spark 只能为 RDD 的每个分区运行 1 个并发任务。假设每个执行程序有 8 个内核和 5 个执行程序:
您需要有:8 * 5 * 3 = 120 个分区

最新更新