sortWithinPartitions如何排序?



将sortWithinPartitions应用于df并将输出写入表后,我得到了一个结果,我不确定如何解释。

df
.select($"type", $"id", $"time")
.sortWithinPartitions($"type", $"id", $"time")

结果文件看起来有点像

1 a 5
2 b 1
1 a 6
2 b 2
1 a 7
2 b 3
1 a 8
2 b 4

它实际上不是随机的,但也不是像我期望的那样排序。也就是说,先按类型,然后是id,然后是时间。如果我尝试在排序之前使用重分区,那么我就会得到我想要的结果。但由于某些原因,文件的重量是原来的5倍(100gb vs 20gb)。

我正在写一个hive orc表,压缩设置为snappy。

有谁知道为什么它是这样排序的,为什么重分区得到正确的顺序,但更大的大小?

使用spark 2.2.

sortWithinPartition的文档状态

返回一个新的数据集,每个分区按给定表达式排序

考虑这个函数的最简单方法是想象第四列(分区id),它被用作主要排序标准。函数spark_partition_id()打印分区

例如,如果你只有一个大的分区(这是你作为一个Spark用户永远不会做的!),sortWithinPartition工作作为一个正常的排序:

df.repartition(1)
.sortWithinPartitions("type","id","time")
.withColumn("partition", spark_partition_id())
.show();

打印

+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
|   1|  a|   5|        0|
|   1|  a|   6|        0|
|   1|  a|   7|        0|
|   1|  a|   8|        0|
|   2|  b|   1|        0|
|   2|  b|   2|        0|
|   2|  b|   3|        0|
|   2|  b|   4|        0|
+----+---+----+---------+

如果有更多的分区,结果只在每个分区内排序:

df.repartition(4)
.sortWithinPartitions("type","id","time")
.withColumn("partition", spark_partition_id())
.show();

打印

+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
|   2|  b|   1|        0|
|   2|  b|   3|        0|
|   1|  a|   5|        1|
|   1|  a|   6|        1|
|   1|  a|   8|        2|
|   2|  b|   2|        2|
|   1|  a|   7|        3|
|   2|  b|   4|        3|
+----+---+----+---------+

为什么要使用sortWithPartition而不是sort?sortWithPartition不触发洗牌,因为数据只在执行器内移动。sort将触发洗牌。因此sortWithPartition执行得更快。如果数据按有意义的列进行分区,那么在每个分区内进行排序可能就足够了。

最新更新