如何通过数据框中的数组列的索引计算均值



我正在使用Spark 2.2。我有一个关于使用ArrayType的基本问题。我没有找到可使用的内置聚合功能。

id列和ArrayTypevalues列给定一个DataFrame

我们要按ID进行分组,然后按索引计算平均值。

所以给定以下输入

{"id": 1, "values":[1.0, 3.0]}
{"id": 1, "values":[3.0, 7.0]}
{"id": 2, "values":[2.0, 4.0]}

我们想要此输出

{"id": 1, "values":[2.0, 5.0]}
{"id": 2, "values":[2.0, 4.0]}

我想出了一个使用 udaf 的解决方案,下面是CF代码。

是否有更好的方法(例如,不使用UDAF)在性能方面?

  val meanByIndex = new UserDefinedAggregateFunction {
    override def inputSchema: StructType =
      StructType(
        StructField("values", ArrayType(DoubleType)) :: Nil
      )
    override def dataType: DataType = ArrayType(DoubleType)
    override def deterministic: Boolean = true
    override def update(buffer: MutableAggregationBuffer, row: Row): Unit = {
      buffer.update(0, buffer.getAs[Long](0) + 1)
      buffer.update(1, sumSeq(buffer.getAs[Seq[Double]](1), row.getAs[Seq[Double]](0))
      )
    }
    override def bufferSchema: StructType =
      StructType(
        StructField("size", LongType) ::
          StructField("sum", ArrayType(DoubleType)) :: Nil
      )
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1.update(0, buffer1.getAs[Long](0) + buffer2.getAs[Long](0))
      buffer1.update(1, sumSeq(buffer1.getAs[Seq[Double]](1), buffer2.getAs[Seq[Double]](1))
      )
    }
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer.update(0, 0L)
      buffer.update(1, Seq.empty[Double])
    }
    override def evaluate(buffer: Row): Any = {
      buffer.getAs[Seq[Double]](1).map(_ / buffer.getAs[Long](0))
    }
    private def sumSeq(s1: Seq[Double], s2: Seq[Double]) = {
      if (s1.isEmpty)
        s2
      else {
        s1.zip(s2).map { case (v1, v2) => v1 + v2 }
      }
    }
  }

[更新]有关 @user6910411的答案,我已经比较了执行计划。

使用udaf

SortAggregate
+- *Sort [id#1 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#1, 200)
      +- SortAggregate
         +- *Sort [id#1 ASC NULLS FIRST], false, 0
            +- *FileScan json

没有UDAF

*HashAggregate
+- Exchange hashpartitioning(id#1, 200)
   +- *HashAggregate
      +- *FileScan json

结论:没有UDAF的解决方案更好,因为我们不需要对整个数据集进行分类。

带有固定尺寸数组的

我不会打扰UserDefinedAggregateFunction并使用标准聚合:

import org.apache.spark.sql.functions._
val df = Seq(
  (1, Seq(1.0, 3.0)),
  (1, Seq(3.0, 7.0)),
  (2, Seq(2.0, 4.0))
).toDF("id", "values")
df.groupBy("id").agg(array((0 until 2) map (i => avg($"values"(i))): _*))
+---+-------------------------------------+
| id|array(avg(values[0]), avg(values[1]))|
+---+-------------------------------------+
|  1|                           [2.0, 5.0]|
|  2|                           [2.0, 4.0]|
+---+-------------------------------------+

相关内容

  • 没有找到相关文章

最新更新