给定一个数据帧:
val df = sc.parallelize(Seq(("foo", ArrayBuffer(null,"bar",null)), ("bar", ArrayBuffer("one","two",null)))).toDF("key", "value")
df.show
+---+--------------------------+
|key| value|
+---+--------------------------+
|foo|ArrayBuffer(null,bar,null)|
|bar|ArrayBuffer(one, two,null)|
+---+--------------------------+
我想从第 value
列中删除null
.删除后,数据帧应如下所示:
+---+--------------------------+
|key| value|
+---+--------------------------+
|foo|ArrayBuffer(bar) |
|bar|ArrayBuffer(one, two) |
+---+--------------------------+
欢迎任何建议。 10x
这里需要一个UDF。例如,使用flatMap
:
val filterOutNull = udf((xs: Seq[String]) =>
Option(xs).map(_.flatMap(Option(_))))
df.withColumn("value", filterOutNull($"value"))
其中,带有map
句柄的外部Option
NULL
列:
Option(null: Seq[String]).map(identity)
Option[Seq[String]] = None
Option(Seq("foo", null, "bar")).map(identity)
Option[Seq[String]] = Some(List(foo, null, bar))
并确保当输入通过映射NULL
/null
时,我们不会因 NPE 而失败
NULL -> null -> None -> None -> NULL
其中null
是Scala null
,NULL
是SQL NULL
。
内部flatMap
展平一系列Options
有效地过滤nulls
:
Seq("foo", null, "bar").flatMap(Option(_))
Seq[String] = List(foo, bar)
一个更命令式的等价物可能是这样的:
val imperativeFilterOutNull = udf((xs: Seq[String]) =>
if (xs == null) xs
else for {
x <- xs
if x != null
} yield x)
选项 1:使用 UDF:
val filterNull = udf((arr : Seq[String]) => arr.filter((x: String) => x != null))
df.withColumn("value", filterNull($"value")).show()
选项 2:无 UDF
df.withColumn("value", explode($"value")).filter($"value".isNotNull).groupBy("key").agg(collect_list($"value")).show()
请注意,这效率较低...
你也可以使用spark-daria,它有:com.github.mrpowers.spark.daria.sql.functions.arrayExNull
从文档中:
类似数组,但不包含空元素