我们正在使用spark解析一个大型csv文件,该文件可能包含无效数据。我们希望将有效数据保存到数据存储中,并返回导入了多少有效数据和多少无效数据。
我想知道我们如何在spark中做到这一点,读取数据时的标准方法是什么?
我目前的方法使用Accumulator
,但由于Accumulator
在spark中的工作方式,它不准确。
// we define case class CSVInputData: all fields are defined as string
val csvInput = spark.read.option("header", "true").csv(csvFile).as[CSVInputData]
val newDS = csvInput
.flatMap { row =>
Try {
val data = new DomainData()
data.setScore(row.score.trim.toDouble)
data.setId(UUID.randomUUID().toString())
data.setDate(Util.parseDate(row.eventTime.trim))
data.setUpdateDate(new Date())
data
} match {
case Success(map) => Seq(map)
case _ => {
errorAcc.add(1)
Seq()
}
}
}
我尝试使用Either
,但它失败了,例外:
. lang。noclassdeffoundererror:没有Java类对应的产品与Serializable与scale .util. either [xx. csvinputdata,xx. csvinputdata]。DomainData]发现
我认为要么不工作与spark 2.0数据集api:
spark.read.option("header", "true").csv("any.csv").map { row =>
try {
Right("")
} catch { case e: Throwable => Left(""); }
}
如果我们使用sc(rdd api),它可以工作:
sc.parallelize('a' to 'z').map { row =>
try {
Right("")
} catch { case e: Throwable => Left(""); }
}.collect()
目前最新的scala http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either:都没有实现Serializable trait
sealed abstract class Either[+A, +B] extends AnyRef
在未来的2.12 http://www.scala-lang.org/api/2.12.x/scala/util/Either.html,它做:
sealed abstract class Either[+A, +B] extends Product with Serializable
更新2与解决方案
更多信息在Spark ETL:使用要么处理无效数据
由于spark数据集不工作,所以解决办法是调用ds。Rdd,然后使用try-left-right来捕获有效和无效的数据。
spark.read.option("header", "true").csv("/Users/yyuan/jyuan/1.csv").rdd.map ( { row =>
try {
Right("")
} catch { case e: Throwable => Left(""); }
}).collect()
您是否考虑过使用Either
val counts = csvInput
.map { row =>
try {
val data = new DomainData()
data.setScore(row.score.trim.toDouble)
data.setId(UUID.randomUUID().toString())
data.setDate(Util.parseDate(row.eventTime.trim))
data.setUpdateDate(new Date())
Right(data)
} catch {
case e: Throwable => Left(row)
}
}
val failedCount = counts.map(_.left).filter(_.e.isLeft).count()
val successCount = counts.map(_.right).filter(_.e.isRight).count()
你试过Spark DDQ吗?它有你需要的大部分数据质量规则。您甚至可以扩展和自定义它。
链接:https://github.com/FRosner/drunken-data-quality