按较短的键对 Spark 数据集进行分组,无需完全随机播放



我有 4 列(c1, c2, c3, c4)的数据集。

我按(c1, c2)分组并执行了一些聚合。

如果在此之后我按(c1)分组,此分组操作会因为以前的分组而更有效吗?

截至今天(Spark 2.1(,优化器不使用这样的结构:

Seq.empty[(Int, Int, Int)].toDF("c1", "c2", "c3")
  .groupBy($"c1", $"c2")
  .sum("c3")
  .groupBy($"c1")
  .avg($"c3")
  .explain
== Physical Plan ==
*HashAggregate(keys=[c1#130], functions=[avg(c3#142L)])
+- Exchange hashpartitioning(c1#130, 200)
   +- *HashAggregate(keys=[c1#130], functions=[partial_avg(c3#142L)])
      +- *HashAggregate(keys=[c1#130, c2#131], functions=[sum(cast(c3#132 as bigint))])
         +- Exchange hashpartitioning(c1#130, c2#131, 200)
            +- *HashAggregate(keys=[c1#130, c2#131], functions=[partial_sum(cast(c3#132 as bigint))])
               +- LocalTableScan <empty>, [c1#130, c2#131, c3#132]

如您所见,Spark 计划两个独立的Exchanges,而不是对两个聚合使用更常规的分组。因此,在实践中,答案取决于两个因素:

  • 聚合功能。如果函数以恒定的内存占用量运行并应用"映射端"缩减(如sum(,那么较低的熵可以增加可以局部减少的数据量并减小随机大小。

  • 值分布。 c1基数必须足够高,以便在第一个聚合中将多个(c1, c2)级别分配给单个分区。如果每个分区的每个c1值只有减少的行,则没有任何好处。

如果同时满足这两个条件,您应该会看到一些性能提升。

相关内容

  • 没有找到相关文章

最新更新