我有一个火花数据框架,例如:
+-------------+------------------------------------------+
|a |destination |
+-------------+------------------------------------------+
|[a,Alice,1] |[[b,Bob,0], [e,Esther,0], [h,Fraudster,1]]|
|[e,Esther,0] |[[f,Fanny,0], [d,David,0]] |
|[c,Charlie,0]|[[b,Bob,0]] |
|[b,Bob,0] |[[c,Charlie,0]] |
|[f,Fanny,0] |[[c,Charlie,0], [h,Fraudster,1]] |
|[d,David,0] |[[a,Alice,1], [e,Esther,0]] |
+-------------+------------------------------------------+
带有
的模式|-- destination: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- var_only_0_and_1: integer (nullable = false)
如何构造在destination
列上运行的UDF,即由collect_list
UDF的Spark创建的包装阵列来计算可变var_only_0_and_1
的平均值?
您可以直接在数组上操作,只要您获得了udf正确的方法签名(过去使我震惊)。阵列列被UDF视为SEQ,而构造作为行,因此您需要类似的东西:
def test (in:Seq[Row]): String = {
// return a named field from the second struct in the array
in(2).getAs[String]("var_only_0_and_1")
}
var udftest = udf(test _)
我已经在看起来像您的数据上对此进行了测试。我猜想在Seq [Row]的字段上迭代以实现您想要的东西。
说实话,我完全不确定这样做的类型安全性,而且我相信爆炸是按照@ayplam进行的优选方法。内置功能通常比开发人员提供的任何UDF都快,因为Spark无法优化UDF。
您可以使用本机Spark SQL功能。
df.withColumn("dest",explode(col("destination")).
groupBy("a").agg(avg(col("dest").getField("var_only_0_and_1")))