如何验证Spark Dataframe的内容



我有以下Scala Spark代码库,它工作得很好,但不应该。

第二列有混合类型的数据,而在Schema中我将其定义为IntegerType。我的实际程序有100多个列,并且在转换后不断导出多个子DataFrames

如何验证RDDDataFrame字段的内容具有正确的数据类型值,从而忽略无效行或将列的内容更改为某些默认值?DataFrameRDD的数据质量检查的更多指针是赞赏的。

var theSeq = Seq(("X01", "41"),
    ("X01", 41),
    ("X01", 41),
    ("X02", "ab"),
    ("X02", "%%"))
val newRdd = sc.parallelize(theSeq)
val rowRdd = newRdd.map(r => Row(r._1, r._2))
val theSchema = StructType(Seq(StructField("ID", StringType, true),
    StructField("Age", IntegerType, true)))
val theNewDF = sqc.createDataFrame(rowRdd, theSchema)
theNewDF.show()  

首先,传递schema只是一种避免类型推断的方法。它在DataFrame创建期间不被验证或强制执行。顺便说一句,我不会将ClassCastException描述为运行良好。刚才我还以为你真的发现了一个bug呢。

我认为重要的问题是你首先是如何获得像theSeq/newRdd这样的数据的。它是您自己解析的东西,还是从外部组件接收的?简单地看一下类型(分别是Seq[(String, Any)]/RDD[(String, Any)]),您已经知道它不是DataFrame的有效输入。在这个级别上处理事情的方法可能是采用静态类型。Scala提供了许多简洁的方法来处理意外情况(Try, Either, Option),其中最后一个是最简单的,并且作为奖励与Spark SQL一起工作得很好。一种非常简单的处理方式可以像这样

def validateInt(x: Any) = x match {
  case x: Int => Some(x)
  case _ => None
}
def validateString(x: Any) = x match { 
  case x: String => Some(x)
  case _ => None
}
val newRddOption: RDD[(Option[String], Option[Int])] = newRdd.map{
  case (id, age) => (validateString(id), validateInt(age))}

因为Options可以很容易地组成,你可以添加额外的检查,像这样:

def validateAge(age: Int) = {
  if(age >= 0 && age < 150) Some(age)
  else None
}
val newRddValidated: RDD[(Option[String], Option[Int])] = newRddOption.map{
  case (id, age) => (id, age.flatMap(validateAge))}

下一个代替Row,这是一个非常粗糙的容器,我将使用案例类:

case class Record(id: Option[String], age: Option[Int])
val records: RDD[Record] = newRddValidated.map{case (id, age) => Record(id, age)}

此刻你所要做的就是调用toDF:

import org.apache.spark.sql.DataFrame
val df: DataFrame = records.toDF
df.printSchema
// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)

这是一种比较困难的方法,但可以说是一种更优雅的方法。较快的一种方法是让SQL强制转换系统为您做一项工作。首先让我们将所有内容转换为Strings:

val stringRdd: RDD[(String, String)] = sc.parallelize(theSeq).map(
  p => (p._1.toString, p._2.toString))

下一步创建一个DataFrame:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.col
val df: DataFrame = stringRdd.toDF("id", "age")
val expectedTypes = Seq(StringType, IntegerType)
val exprs: Seq[Column] = df.columns.zip(expectedTypes).map{
  case (c, t) => col(c).cast(t).alias(c)}
val dfProcessed: DataFrame = df.select(exprs: _*)

和结果:

dfProcessed.printSchema
// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)

dfProcessed.show
// +---+----+
// | id| age|
// +---+----+
// |X01|  41|
// |X01|  41|
// |X01|  41|
// |X02|null|
// |X02|null|
// +---+----+

1.4及以上版本

import org.apache.spark.sql.execution.debug._
theNewDF.typeCheck

它是通过SPARK-9754被移除的。我没有检查,但是我认为typeCheck事先变成了sqlContext.debug

相关内容

  • 没有找到相关文章