我正在做一个更大的项目,在这个项目中,这个函数是从withColumn中调用的。它有一些不同的操作,但这里有一个示例用法:
case class Employee(id: Int, name: String)
val df = Seq(new Employee(1, "Elia"), new Employee(2, null), new Employee(3, "Fang")).toDF
df.show
+---+----+
| id|name|
+---+----+
| 1|Elia|
| 2|null|
| 3|Fang|
+---+----+
def test1: Column = {
concat(col("id"), col("name"))
}
df.withColumn("concat", test1).show
+---+----+------+
| id|name|concat|
+---+----+------+
| 1|Elia| 1Elia|
| 2|null| null|
| 3|Fang| 3Fang|
+---+----+------+
所以我想做的是,如果其中一列有任何null,则抛出一个异常。test1函数中的类似内容:
if(col("id").isNull.sum > 0){
throw IllegalArgumentException("id can not have any nulls")
}
但显然列是无法求和的。我还尝试了sum(col("id"(.isNull(,但这同样无效。我在stackoverflow上看到的所有例子都与使用df级别的函数有关,例如df.filter("id为null"(.count>0。但在我使用的框架中,为了进行简单的qc检查以抛出更准确的异常,这需要进行相当大规模的重构。我正在修改的函数的作用域无法访问数据帧。我想做的事情可能吗?
谢谢!
您可以为这种情况定义UDF(用户定义函数(。看看这个例子:
import org.apache.spark.sql.{functions => F}
val testUdf = F.udf((id: Option[Int], name: Option[String]) => {
(id, name) match {
case (None, _) => throw new RuntimeException("id can not have any nulls")
case (_, None) => throw new RuntimeException("name can not have any nulls")
case (Some(id), Some(name)) => s"$id$name"
}
})
df.withColumn("concat", testUdf($"id", $"name")).show
根据您使用的spark版本,您有几个选项
如果火花版本<3.1,你可以这样使用udf:
val throwExUdf = udf(
(d: Option[String]) => {
d match {
case None => throw new RuntimeException("message")
case Some(v) => v
}
}
)
df.withColumn(
"concat",
when($"name".isNotNull, concat($"id", $"name")).otherwise(throwExUdf($"name"))
)
带火花版本>=3.1您还可以选择使用内置函数raise_error
(doc(
像这样:
df.withColumn(
"concat",
when($"name".isNotNull, concat($"id", $"name")).otherwise(raise_error(lit("Name is null")))
)