如何删除RDD中的所有记录,包括空



我从csv文件加载了一个RDD。但是,此文件包含无效数据。所以,当我尝试用first输出这个RDD的触点时.例外情况是

导致:java.lang.NumberFormat异常:空字符串

我希望找到解决方案,当一条记录包含空字符串时,删除RDD中的所有记录。此外,这个RDD包含的字段太多,因此很难逐个处理每个字段。我记得DataFrame有这样的功能,比如na.drop().我需要这种功能适用于RDD.

我使用的代码是这样的:

//using case class
case class Flight(dest_id:Long, dest:String, crsdeptime:Double, deptime:Double, depdelaymins:Double, crsarrtime:Double)
//defining function
def parseFlight(str: String): Flight = {
  val line = str.split(",")
  Flight(line(0), line(1), line(2), line(3), line(4).toInt, line(5).toLong)
}
//loading data
val textRDD = sc.textFile("/root/data/data.csv")
val flightsRDD = textRDD.map(parseFlight)

更新

当我使用由DateFrame转换的RDD时。我发现RDD的每一行都是行对象。如何提取一行的某些字段来构建边缘对象?

如果 csv 文件中的标头与 case 类中的变量名称匹配,则更容易将数据作为数据帧读取,然后使用 na.drop()

val flightsDf = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("/root/data/data.csv")
  .na.drop()
  .as[Flight]

如果你想要一个rdd,之后总是可以用flightsDf.rdd转换它。

最新更新