>我有csv格式文件,由分隔符管道"|"分隔。数据集有 2 列,如下所示。
Column1|Column2
1|Name_a
2|Name_b
但有时我们只收到一个列值,而另一个则丢失,如下所示
Column1|Column2
1|Name_a
2|Name_b
3
4
5|Name_c
6
7|Name_f
因此,对于上面的示例,任何具有不匹配的列 no 的行对我们来说都是垃圾值,它将是列值为 3, 4, and 6
的行,我们希望丢弃这些行。有什么直接的方法可以丢弃这些行,在从 spark-shell 读取数据时没有异常,如下所示。
val readFile = spark.read.option("delimiter", "|").csv("File.csv").toDF(Seq("Column1", "Column2"): _*)
当我们尝试读取文件时,我们得到以下异常。
java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (1): _c0
New column names (2): Column1, Column2
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435)
... 49 elided
您可以指定数据文件的架构,并允许某些列可为空。在 scala 中,它可能看起来像:
val schm = StructType(
StructField("Column1", StringType, nullable = true) ::
StructField("Column3", StringType, nullable = true) :: Nil)
val readFile = spark.read.
option("delimiter", "|")
.schema(schm)
.csv("File.csv").toDF
比可以按列筛选数据集不为空。
只需在阅读时将DROPMALFORMED
模式添加到选项中,如下所示。设置此选项会使 Spark 删除损坏的记录。
val readFile = spark.read
.option("delimiter", "|")
.option("mode", "DROPMALFORMED") // Option to drop invalid rows.
.csv("File.csv")
.toDF(Seq("Column1", "Column2"): _*)
此处对此进行了记录。