Spark UDF在数组上运行



我有一个火花数据框架,例如:

+-------------+------------------------------------------+
|a            |destination                               |
+-------------+------------------------------------------+
|[a,Alice,1]  |[[b,Bob,0], [e,Esther,0], [h,Fraudster,1]]|
|[e,Esther,0] |[[f,Fanny,0], [d,David,0]]                |
|[c,Charlie,0]|[[b,Bob,0]]                               |
|[b,Bob,0]    |[[c,Charlie,0]]                           |
|[f,Fanny,0]  |[[c,Charlie,0], [h,Fraudster,1]]          |
|[d,David,0]  |[[a,Alice,1], [e,Esther,0]]               |
+-------------+------------------------------------------+

带有

的模式
|-- destination: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- var_only_0_and_1: integer (nullable = false)

如何构造在destination列上运行的UDF,即由collect_list UDF的Spark创建的包装阵列来计算可变var_only_0_and_1的平均值?

您可以直接在数组上操作,只要您获得了udf正确的方法签名(过去使我震惊)。阵列列被UDF视为SEQ,而构造作为行,因此您需要类似的东西:

def test (in:Seq[Row]): String = {
  // return a named field from the second struct in the array
  in(2).getAs[String]("var_only_0_and_1")
}
var udftest = udf(test _)

我已经在看起来像您的数据上对此进行了测试。我猜想在Seq [Row]的字段上迭代以实现您想要的东西。

说实话,我完全不确定这样做的类型安全性,而且我相信爆炸是按照@ayplam进行的优选方法。内置功能通常比开发人员提供的任何UDF都快,因为Spark无法优化UDF。

您可以使用本机Spark SQL功能。

df.withColumn("dest",explode(col("destination")).
groupBy("a").agg(avg(col("dest").getField("var_only_0_and_1")))

相关内容

  • 没有找到相关文章

最新更新