对 Spark 数据帧的同一列执行多个聚合操作



我有三个字符串类型的数组,包含以下信息:

  • groupBy 数组:包含我要作为数据分组依据的列的名称。
  • 聚合数组:包含我要聚合的列的名称。
  • 操作
  • 数组:包含我要执行的聚合操作

我正在尝试使用火花数据帧来实现这一点。Spark 数据帧提供了一个 agg(),您可以在其中传递 Map [String,String](列名和相应的聚合操作)作为输入,但是我想对同一列数据执行不同的聚合操作。关于如何实现这一目标的任何建议?

Scala

例如,您可以使用从名称到函数的定义mapping映射函数列表:

import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column
val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
  "min" -> min, "max" -> max, "mean" -> avg)
val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))
df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// |  k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// |  1|   3.0|   3.0|   3.0|
// |  2|  -5.0|  -5.0|  -5.0|
// +---+------+------+------+

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show

不幸的是,内部使用的解析器SQLContext不会公开,但您可以随时尝试构建普通的SQL查询:

df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
  f => s"$f($c) AS ${c}_${f}")
).mkString(",")
sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")

蟒蛇

from pyspark.sql.functions import mean, sum, max, col
df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"] 
funs = [mean, sum, max]
exprs = [f(col(c)) for f in funs for c in aggregate]
# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)

另请参阅:

  • Spark SQL:将聚合函数应用于列列表

对于那些想知道的人,如果没有python中的列表理解,如何编写@zero323答案:

from pyspark.sql.functions import min, max, col
# init your spark dataframe
expr = [min(col("valueName")),max(col("valueName"))]
df.groupBy("keyName").agg(*expr)

做类似的事情

from pyspark.sql import functions as F
df.groupBy('groupByColName') 
  .agg(F.sum('col1').alias('col1_sum'),
       F.max('col2').alias('col2_max'),
       F.avg('col2').alias('col2_avg')) 
  .show()

这是在使用 Scala 时在同一列上应用不同聚合函数的另一种直接方法(这已在 Azure Databricks 中进行了测试)。

val groupByColName = "Store"
val colName = "Weekly_Sales"
df.groupBy(groupByColName)
  .agg(min(colName),
       max(colName),
       round(avg(colName), 2))
  .show()
例如,

如果您想计算 PySpark 数据帧中每列中的零百分比,我们可以为此使用表达式在数据帧的每一列上执行

from pyspark.sql.functions import count,col
    def count_zero_percentage(c):
        
        pred = col(c)==0
        return sum(pred.cast("integer")).alias(c)
    
    df.agg(*[count_zero_percentage(c)/count('*').alias(c) for c in df.columns]).show()
case class soExample(firstName: String, lastName: String, Amount: Int)
val df =  Seq(soExample("me", "zack", 100)).toDF
import org.apache.spark.sql.functions._
val groupped = df.groupBy("firstName", "lastName").agg(
     sum("Amount"),
     mean("Amount"), 
     stddev("Amount"),
     count(lit(1)).alias("numOfRecords")
   ).toDF()
display(groupped)

//由扎克提供 ..

扎克简化帖子的答案 标记为重复Spark Scala 数据帧具有单个分组依据的多个聚合

相关内容

  • 没有找到相关文章

最新更新