我正在使用Scala开发Spark,并且我没有Scala的任何背景。我还没有得到Valueerror,但是我正在为我的代码准备ValueError处理程序。
|location|arrDate|deptDate|
|JFK |1201 |1209 |
|LAX |1208 |1212 |
|NYC | |1209 |
|22 |1201 |1209 |
|SFO |1202 |1209 |
如果我们有这样的数据,我想将第三行和第四行存储到错误中。dat然后再次处理第五行。在错误日志中,我想将数据信息(例如哪个文件,行数和错误详细信息)放置。对于Logger,我现在使用Log4J。
实施该功能的最佳方法是什么?你们能帮我吗?
我假设所有三列都是字符串。在这种情况下,我将使用以下片段解决此问题。我创建了两个UDF来检查错误记录。
- 如果字段的数字字符[
isNumber
] - 如果字符串字段为空[
isEmpty
]
代码片段
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.udf
val df = rdd.zipWithIndex.map({case ((x,y,z),index) => (index+1,x,y,z)}).toDF("row_num", "c1", "c2", "c3")
val isNumber = udf((x: String) => x.replaceAll("\d","") == "")
val isEmpty = udf((x: String) => x.trim.length==0)
val errDF = df.filter(isNumber($"c1") || isEmpty($"c2"))
val validDF = df.filter(!(isNumber($"c1") || isEmpty($"c2")))
scala> df.show()
+-------+---+-----+-----+
|row_num| c1| c2| c3|
+-------+---+-----+-----+
| 1|JFK| 1201| 1209|
| 2|LAX| 1208| 1212|
| 3|NYC| | 1209|
| 4| 22| 1201| 1209|
| 5|SFO| 1202| 1209|
+-------+---+-----+-----+
scala> errDF.show()
+-------+---+----+----+
|row_num| c1| c2| c3|
+-------+---+----+----+
| 3|NYC| |1209|
| 4| 22|1201|1209|
+-------+---+----+----+