在UDAF之后,Spark将SQL行展平为新列



我有一个Spark数据帧,在上面我做了一个groupBy,一个用户定义的聚合和一个库聚合:

data.groupBy("var1").agg(sum("var2"), makefreqs("var3")).first

产生类似的东西

org.apache.spark.sql.Row = ["var1_value1", 219, WrappedArray(0.6, 0.1, 0.3)]

我最终想做的是将所有行(而不仅仅是.first)转换为以下格式:

org.apache.spark.mllib.linalg.Vector = [219, 0.6, 0.1, 0.3]

我一直纠结于如何在sql中将WrappedArray(0.6、0.1、0.3)扁平化为三个新列(最好带有名称)。一行做这件事最聪明的方法是什么?

您可以在Apache DataFu中找到所需的开箱即用的解决方案。explodeArray方法正是您所需要的:

import datafu.spark.DataFrameOps._
val df = sc.parallelize(Seq(("var1_value1",219,Array(0.6, 0.1, 0.3)))).toDF
df.explodeArray(col("_3"), "array").show

这将产生:

+-----------+---+---------------+------+------+------+
|    _1     | _2|       _3      |array0|array1|array2|
+-----------+---+---------------+------+------+------+
|var1_value1|219|[0.6, 0.1, 0.3]|   0.6|   0.1|   0.3|
+-----------+---+---------------+------+------+------+

请考虑,为了评估要为数组创建多少列,此方法会评估数据帧——如果计算成本很高,则应该缓存数据帧。

我不确定最聪明的方法,但以下是我的建议。

一种方法是,您可以使用map函数,但它将成为RDD,因此您需要将其转换回DataFrame。

val df = data.groupBy("var1").agg(sum("var2"), makefreqs("var3"))
// implicitly pass context to toDF
import sqlContext.implicits._
df.map { case Row(var1, sumVar2, array: WrappedArray) => 
  Row(var1, sumVar2, array(0), array(1), array(2) }
  .toDF("var1", "sum_var2", "a1", "a2", "a3")

如果你不喜欢来回转换,你可以使用udf来完成任务。

val arrayToColumn = (index: Int) => udf { (array: Seq[Double]) => array(index) }
df
  .withColumn("a0", arrayToColumn(0)(df("array_col")))
  .withColumn("a1", arrayToColumn(1)(df("array_col")))
  .withColumn("a2", arrayToColumn(2)(df("array_col")))

最新更新