如何使用用column计算列中的最大值



我正在尝试计算Spark 1.6.1 中的以下数据框的最大值:

val df = sc.parallelize(Seq(1,2,3)).toDF("id")

第一种方法是选择最大值,并且可以按预期工作:

df.select(max($"id")).show

第二种方法可能是使用withColumn如下:

df.withColumn("max", max($"id")).show

,但不幸的是,由于以下错误消息而失败:

org.apache.spark.sql.sql.analysisexception:expression'id'都不是 存在于组中,也不是汇总函数。添加到组 如果您不关心哪个值 你得到。;

如何在没有任何WindowgroupBy的情况下计算withColumn函数中的最大值?如果不可能,在这种特定情况下,我该如何使用Window

正确的方法是将汇总计算为单独的查询,并与实际结果结合使用。与窗口函数不同,在这里的许多答案中建议,它不需要单个分区的混音,并且适用于大型数据集。

可以使用单独的操作完成withColumn

import org.apache.spark.sql.functions.{lit, max}
df.withColumn("max", lit(df.agg(max($"id")).as[Int].first))

,使用任何一个明确的方法要干净:

import org.apache.spark.sql.functions.broadcast
df.crossJoin(broadcast(df.agg(max($"id") as "max")))

或隐式交叉加入:

spark.conf.set("spark.sql.crossJoin.enabled", true)
df.join(broadcast(df.agg(max($"id") as "max")))

apache spark中的功能很少。

  • 聚集功能,例如麦克斯,当我们想将多行汇总到一个
  • 当我们想将一个列转换为另一列
  • 收集功能,例如爆炸,当一排将扩展到多行时。

隐式聚合

当我们想将更多的行聚集到一个时。

以下代码内部具有聚合。

df.select(max($"id")).explain
== Physical Plan ==
*HashAggregate(keys=[], functions=[max(id#3)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_max(id#3)])
      +- *Project [value#1 AS id#3]
         +- Scan ExistingRDD[value#1]

我们还可以在选择中使用多个聚合函数。

df.select(max($"id"), min($"id")).explain

聚合函数无法直接与非聚合函数混合

以下代码将报告错误。

df.select(max($"id"), $"id")
df.withColumn("max", max($"id"))

因为 max($"id")的值很少,然后 $"id"

over

汇总

在这种情况下,将分析函数应用于结果集中的所有行。

中的所有行。

我们可以使用

df.select(max($"id").over, $"id").show

df.withColumn("max", max($"id").over).show

这是Spark 2.0 此处。

使用withColumn和窗口功能,可能如下:

df.withColumn("max", max('id) over)

请注意,要假设一个"空"窗口的空over(相当于over ())。

但是,如果您需要更完整的WindowSpec,则可以执行以下操作(同样,这是2.0):

import org.apache.spark.sql.expressions._
// the trick that has performance cost (!)
val window = Window.orderBy()
df.withColumn("max", max('id) over window).show

请注意,该代码有Spark本身报告的严重性能问题:

警告WindowExec:没有为窗口操作定义的分区!将所有数据移至单个分区,这可能会导致严重的性能下降。

相关内容

  • 没有找到相关文章

最新更新