我有一个具有以下结构的CSV:
标题,标头,标头,标头,标头
val1,val2,val3,val4,val5
val1,val2,null,val4,val5
val1,val2,val3,null,val5
我需要做的是滤除标题和在特定位置包含无效值的数据线(可以在Val3处有null,而不是Val4的null)。我做了一个RDD并在逗号上拆分了线,我希望能像阵列的索引位置一样访问每条线。但是我找不到如何进行比较。我可以用:
提取字段rdd.map(values =>(值(2))
您如何进行比较?特别是"不包含"。我认为有一种可用的比较方法,还是此问题需要元组和!包含?
假设您已经定义了一种用于包装这些值的类型,请说:
case class Record(val1: String, val2: Option[String], val3: String, val4: Option[String])
val rdd: RDD[Record] = ...
rdd.filter(record => record.val2.isDefined && record.val4.isDefined)
我希望这有帮助。
如果您使用DataFrame
S而不是RDD
S,则将使用filter
与Boolean Column
操作一起使用。
假设val4
和val5
都不应为null。
如果您的CSV看起来像这样:
evan@vbox ~ > cat dat_1.csv
header1,header2,header3,header4,header5
val1,val2,val3,val4,val5
val1,val2,,val4,val5
val1,val2,val3,,val5
然后您的代码看起来像:
scala> val dat_1 = spark.read.option("header", true).csv("dat_1.csv")
dat_1: org.apache.spark.sql.DataFrame = [header1: string, header2: string ... 3 more fields]
scala> dat_1.show
+-------+-------+-------+-------+-------+
|header1|header2|header3|header4|header5|
+-------+-------+-------+-------+-------+
| val1| val2| val3| val4| val5|
| val1| val2| null| val4| val5|
| val1| val2| val3| null| val5|
+-------+-------+-------+-------+-------+
scala> data1.filter($"header4".isNotNull && $"header5".isNotNull).show
+-------+-------+-------+-------+-------+
|header1|header2|header3|header4|header5|
+-------+-------+-------+-------+-------+
| val1| val2| val3| val4| val5|
| val1| val2| null| val4| val5|
+-------+-------+-------+-------+-------+
否则,如果您的数据看起来像这样:
evan@vbox ~ > cat dat_2.csv
header1,header2,header3,header4,header5
val1,val2,val3,val4,val5
val1,val2,null,val4,val5
val1,val2,val3,null,val5
然后您的代码看起来像这样:
scala> val dat_2 = spark.read.option("header", true).csv("dat_2.csv")
dat_2: org.apache.spark.sql.DataFrame = [header1: string, header2: string ... 3 more fields]
scala> dat_2.show
+-------+-------+-------+-------+-------+
|header1|header2|header3|header4|header5|
+-------+-------+-------+-------+-------+
| val1| val2| val3| val4| val5|
| val1| val2| null| val4| val5|
| val1| val2| val3| null| val5|
+-------+-------+-------+-------+-------+
scala> dat_2.filter($"header4" =!= "null" && $"header5" =!= "null").show
+-------+-------+-------+-------+-------+
|header1|header2|header3|header4|header5|
+-------+-------+-------+-------+-------+
| val1| val2| val3| val4| val5|
| val1| val2| null| val4| val5|
+-------+-------+-------+-------+-------+
输入文件中的null值未用文件中表示的方式表示:
header,header,header,header,header
val1, val2, val3, val4, val5
val1, val2, null, val4, val5
val1, val2, val3, null, val5
应该是:
header,header,header,header,header
val1, val2, val3, val4, val5
val1, val2, null, val4, val5
val1, val2, val3,, val5
解决方案:使用mappartitionswithIndex删除0次索引的第一迭代器将从输入文件中过滤标头,而在第四字段上的使用!="!
**scala>** sc.textFile("/User/VJ/testfile").
mapPartitionsWithIndex((x,y) => if (x==0) y.drop(1) else y).
filter(x=>x.split(",")(3) != "" ).
take(5).foreach(println)
所需的输出:
val1, val2, val3, val4, val5
val1, val2, null, val4, val5
在这里示例https://tips-to-code.blogspot.com/2018/08/nulls-in-scala-spark.html
谢谢vishal。