如何在解析期间获得无效数据的计数



我们正在使用spark解析一个大型csv文件,该文件可能包含无效数据。我们希望将有效数据保存到数据存储中,并返回导入了多少有效数据和多少无效数据。

我想知道我们如何在spark中做到这一点,读取数据时的标准方法是什么?

我目前的方法使用Accumulator,但由于Accumulator在spark中的工作方式,它不准确。

// we define case class CSVInputData: all fields are defined as string
val csvInput = spark.read.option("header", "true").csv(csvFile).as[CSVInputData]
val newDS = csvInput
  .flatMap { row =>
    Try {
      val data = new DomainData()
      data.setScore(row.score.trim.toDouble)
      data.setId(UUID.randomUUID().toString())
      data.setDate(Util.parseDate(row.eventTime.trim))
      data.setUpdateDate(new Date())
      data
    } match {
      case Success(map) => Seq(map)
      case _ => {
        errorAcc.add(1)
        Seq()
      }
    }
}

我尝试使用Either,但它失败了,例外:

. lang。noclassdeffoundererror:没有Java类对应的产品与Serializable与scale .util. either [xx. csvinputdata,xx. csvinputdata]。DomainData]发现

我认为要么不工作与spark 2.0数据集api:

      spark.read.option("header", "true").csv("any.csv").map { row => 
      try {
        Right("")
      } catch {  case e: Throwable => Left(""); }
      }

如果我们使用sc(rdd api),它可以工作:

      sc.parallelize('a' to 'z').map { row => 
      try {
        Right("")
      } catch {  case e: Throwable => Left(""); }
      }.collect()

目前最新的scala http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either:都没有实现Serializable trait

sealed abstract class Either[+A, +B] extends AnyRef

在未来的2.12 http://www.scala-lang.org/api/2.12.x/scala/util/Either.html,它做:

sealed abstract class Either[+A, +B] extends Product with Serializable

更新2与解决方案

更多信息在Spark ETL:使用要么处理无效数据

由于spark数据集不工作,所以解决办法是调用ds。Rdd,然后使用try-left-right来捕获有效和无效的数据。

   spark.read.option("header", "true").csv("/Users/yyuan/jyuan/1.csv").rdd.map ( { row => 
   try {
     Right("")
   } catch {  case e: Throwable => Left(""); }
   }).collect()

您是否考虑过使用Either

val counts = csvInput
      .map { row =>
        try {
          val data = new DomainData()
          data.setScore(row.score.trim.toDouble)
          data.setId(UUID.randomUUID().toString())
          data.setDate(Util.parseDate(row.eventTime.trim))
          data.setUpdateDate(new Date())
          Right(data)
        } catch {
          case e: Throwable => Left(row)
        }
      }
      val failedCount = counts.map(_.left).filter(_.e.isLeft).count()
      val successCount = counts.map(_.right).filter(_.e.isRight).count()

你试过Spark DDQ吗?它有你需要的大部分数据质量规则。您甚至可以扩展和自定义它。

链接:https://github.com/FRosner/drunken-data-quality

相关内容

  • 没有找到相关文章

最新更新