我有 4 列(c1, c2, c3, c4)
的数据集。
我按(c1, c2)
分组并执行了一些聚合。
如果在此之后我按(c1)
分组,此分组操作会因为以前的分组而更有效吗?
截至今天(Spark 2.1(,优化器不使用这样的结构:
Seq.empty[(Int, Int, Int)].toDF("c1", "c2", "c3")
.groupBy($"c1", $"c2")
.sum("c3")
.groupBy($"c1")
.avg($"c3")
.explain
== Physical Plan ==
*HashAggregate(keys=[c1#130], functions=[avg(c3#142L)])
+- Exchange hashpartitioning(c1#130, 200)
+- *HashAggregate(keys=[c1#130], functions=[partial_avg(c3#142L)])
+- *HashAggregate(keys=[c1#130, c2#131], functions=[sum(cast(c3#132 as bigint))])
+- Exchange hashpartitioning(c1#130, c2#131, 200)
+- *HashAggregate(keys=[c1#130, c2#131], functions=[partial_sum(cast(c3#132 as bigint))])
+- LocalTableScan <empty>, [c1#130, c2#131, c3#132]
如您所见,Spark 计划两个独立的Exchanges
,而不是对两个聚合使用更常规的分组。因此,在实践中,答案取决于两个因素:
聚合功能。如果函数以恒定的内存占用量运行并应用"映射端"缩减(如
sum
(,那么较低的熵可以增加可以局部减少的数据量并减小随机大小。值分布。
c1
基数必须足够高,以便在第一个聚合中将多个(c1, c2)
级别分配给单个分区。如果每个分区的每个c1
值只有减少的行,则没有任何好处。
如果同时满足这两个条件,您应该会看到一些性能提升。