在不创建null的情况下在spark DataFrame中强制转换列



有没有一种方法可以在Spark中强制转换列,并在类型不匹配的情况下使其失败,而不是返回null?

作为一个例子,我有一个DF,里面有所有的字符串列,但其中一个我想铸造到的日期

+----------+------------+------------+
|   service|   eventType|process_date|
+----------+------------+------------+
| myservice| myeventtype|  2020-10-15|
| myservice| myeventtype|  2020-02-15|
|myservice2|myeventtype3|  notADate  |
+----------+------------+------------+

如果我尝试用主cast函数df.withColumn("process_date", df("process_date").cast(targetType))来强制转换,它将用null替换坏数据

+----------+------------+------------+
|   service|   eventType|process_date|
+----------+------------+------------+
| myservice| myeventtype|  2020-10-15|
| myservice| myeventtype|  2020-02-15|
|myservice2|myeventtype3|        null|
+----------+------------+------------+

在我当前的程序中使用此函数可能会导致危险的数据丢失,我可能要等到为时已晚才能捕捉到这些数据。

我找到了两种做你想做的事的方法。

首先,如果您真的希望在日期不可解析时流程失败,您可以使用UDF:

import java.time.LocalDate
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DateType
object Data {
val tuples = List(
("myservice", "myeventtype", "2020-10-15"),
("myservice", "myeventtype", "2020-02-15"),
("myservice2", "myeventtype3", "notADate")
)
}
object BadDates {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local[2]").appName("Simple Application").getOrCreate()
import spark.implicits._
val dfBad = Data.tuples.toDF("service","eventType","process_date")
val dateConvertUdf = udf({str : String => java.sql.Date.valueOf(LocalDate.parse(str))})
dfBad
.withColumn("process_date", dateConvertUdf(col("process_date")))
.show()
}
}

这将失败,但出现以下异常:

Exception in thread "main" org.apache.spark.SparkException: Failed to execute user defined function(BadDates$$$Lambda$1122/934288610: (string) => date)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1130)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
...
Caused by: java.time.format.DateTimeParseException: Text 'notADate' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)

或者,您可以进行转换,并检查转换后的值是否为空,但原始值不适用于任何行:

object BadDates2 {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local[2]").appName("Simple Application").getOrCreate()
import spark.implicits._
val dfBad = Data.tuples.toDF("service","eventType","process_date")
val df = dfBad
.withColumn("process_date_dat", col("process_date").cast(DateType))
val badLines = df
.filter(col("process_date").isNotNull && col("process_date_dat").isNull)
.count()
assert(badLines==0) //This will fail, badLines is 1
}
}

最新更新