我正在尝试计算Spark 1.6.1 中的以下数据框的最大值:
val df = sc.parallelize(Seq(1,2,3)).toDF("id")
第一种方法是选择最大值,并且可以按预期工作:
df.select(max($"id")).show
第二种方法可能是使用withColumn
如下:
df.withColumn("max", max($"id")).show
,但不幸的是,由于以下错误消息而失败:
org.apache.spark.sql.sql.analysisexception:expression'id'都不是 存在于组中,也不是汇总函数。添加到组 如果您不关心哪个值 你得到。;
如何在没有任何Window
或groupBy
的情况下计算withColumn
函数中的最大值?如果不可能,在这种特定情况下,我该如何使用Window
?
正确的方法是将汇总计算为单独的查询,并与实际结果结合使用。与窗口函数不同,在这里的许多答案中建议,它不需要单个分区的混音,并且适用于大型数据集。
可以使用单独的操作完成withColumn
:
import org.apache.spark.sql.functions.{lit, max}
df.withColumn("max", lit(df.agg(max($"id")).as[Int].first))
,使用任何一个明确的方法要干净:
import org.apache.spark.sql.functions.broadcast
df.crossJoin(broadcast(df.agg(max($"id") as "max")))
或隐式交叉加入:
spark.conf.set("spark.sql.crossJoin.enabled", true)
df.join(broadcast(df.agg(max($"id") as "max")))
apache spark中的功能很少。
- 聚集功能,例如麦克斯,当我们想将多行汇总到一个 时
- 当我们想将一个列转换为另一列 时
- 收集功能,例如爆炸,当一排将扩展到多行时。
隐式聚合
当我们想将更多的行聚集到一个时。
以下代码内部具有聚合。
df.select(max($"id")).explain
== Physical Plan ==
*HashAggregate(keys=[], functions=[max(id#3)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_max(id#3)])
+- *Project [value#1 AS id#3]
+- Scan ExistingRDD[value#1]
我们还可以在选择中使用多个聚合函数。
df.select(max($"id"), min($"id")).explain
聚合函数无法直接与非聚合函数混合
以下代码将报告错误。
df.select(max($"id"), $"id")
df.withColumn("max", max($"id"))
因为 max($"id")
的值很少,然后 $"id"
用over
汇总在这种情况下,将分析函数应用于结果集中的所有行。
中的所有行。我们可以使用
df.select(max($"id").over, $"id").show
或
df.withColumn("max", max($"id").over).show
这是Spark 2.0 此处。
使用withColumn
和窗口功能,可能如下:
df.withColumn("max", max('id) over)
请注意,要假设一个"空"窗口的空over
(相当于over ()
)。
但是,如果您需要更完整的WindowSpec
,则可以执行以下操作(同样,这是2.0):
import org.apache.spark.sql.expressions._
// the trick that has performance cost (!)
val window = Window.orderBy()
df.withColumn("max", max('id) over window).show
请注意,该代码有Spark本身报告的严重性能问题:
警告WindowExec:没有为窗口操作定义的分区!将所有数据移至单个分区,这可能会导致严重的性能下降。