我正在尝试使用火花数据帧。根据级联框架的先前知识,该框架具有陷阱机制,可将错误的行(具有空值的行)过滤到称为陷阱的单独分流器中。那些不知道的人让我说清楚。当您收到已从文本文件中读取的错误行时。框架要么从整个数据中剔除坏行,要么停止执行。现在在阿帕奇火花中,我观察到坏行并没有阻碍执行。这很好,但是当涉及到从数据中获取业务见解时,数据质量确实很重要!
因此,我有一个包含大量行的文本文件(您可以根据需要选择任何数据集),其中很少有记录包含空值。现在,我将文本文件加载到带有spark.read.csv的数据帧中。现在,我想做的是分析数据帧并动态创建一个名为"isMyRowBad">的列,其中逻辑将一次分析每一行,如果逻辑找到具有空值的行,它会将该特定行上的 isMyRowBad 列标记为 true,并将没有空值的列标记为真, 相应的列是MyRowBad应该为干净的纯行提供假。
为您提供传入和传出数据集的概述
传入数据帧
fname,lname,age
will,smith,40
Dwayne,Nunn,36
Aniruddha,Sinha,
Maria,,22
传出数据帧
fname,lname,age,isMyRowBad
will,smith,40,false
Dwayne,Nunn,36,false
Aniruddha,Sinha,,true
Maria,,22,true
上面对好行和坏行进行分类的方法可能看起来有点愚蠢,但它确实有意义,因为我不需要多次运行过滤器操作。 让我们来看看,如何?
假设我有一个名为 inDf 的数据帧作为输入 Df 和 AnalysedDf:(DataFrame,DataFrame) 作为输出 Df 元组
现在,我确实尝试了这部分代码
val analyzedDf: (DataFrame, DataFrame) = (inputDf.filter(_.anyNull),inputDf.filter(!_.anyNull))
此代码隔离好行和坏行。我同意!但这有一个性能下降,因为过滤器运行两次,这意味着过滤器将遍历数据集两次!(如果您觉得在考虑 50 个字段和至少 584000 行(即 250 MB 的数据)时运行两次过滤器确实有意义,您可以反驳这一点!
还有这个
val analyzedDf: DataFrame = inputDf.select("*").withColumn("isMyRowBad", <this point, I am not able to analyze row>
上面的代码片段显示了我无法弄清楚如何扫描整行并使用布尔值将该行标记为坏的地方。
希望你们都能明白我的目标是什么。如果您在代码片段中找到语法错误,请忽略语法错误,因为我立即在此处输入了它们(将在以后的编辑中更正相同的内容)
请给我一个提示(一点代码片段或伪代码就足够了)如何继续挑战。如果您不明白我打算做什么,请与我联系。
任何帮助将不胜感激。提前感谢!
PS:在BigData/spark/hadoop/scala等方面有很多聪明的人,请你纠正我可能写错的任何一点(概念上)
顺便说一下,下面的代码给了我一个解决方案。请看一看
package aniruddha.data.quality
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
/**
* Created by aniruddha on 8/4/17.
*/
object DataQualityCheck extends App {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
val schema: StructType = StructType(List(
StructField("fname", StringType, nullable = true),
StructField("lname", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("pan", StringType, nullable = true),
StructField("married", StringType, nullable = true)
))
val inputDataFrame: DataFrame = spark
.read
.schema(schema)
.option("header",true)
.option("delimiter",",")
.csv("inputData/infile")
//inputDataFrame.show()
val analysedDataFrame: DataFrame = inputDataFrame.select("*").withColumn("isRowBad", when($"pan".isNull||$"lname".isNull||$"married".isNull,true).otherwise(false))
analysedDataFrame show
}
输入
fname,lname,age,pan,married
aniruddha,sinha,23,0AA22,no
balajee,venkatesh,23,0b96,no
warren,shannon,72,,
wes,borland,63,0b22,yes
Rohan,,32,0a96,no
james,bond,66,007,no
输出
+---------+---------+---+-----+-------+--------+
| fname| lname|age| pan|married|isRowBad|
+---------+---------+---+-----+-------+--------+
|aniruddha| sinha| 23|0AA22| no| false|
| balajee|venkatesh| 23| 0b96| no| false|
| warren| shannon| 72| null| null| true|
| wes| borland| 63| 0b22| yes| false|
| Rohan| null| 32| 0a96| no| true|
| james| bond| 66| 007| no| false|
+---------+---------+---+-----+-------+--------+
代码工作正常,但我对 when 函数有问题。我们不能只选择所有列而不对其进行硬编码吗?
据我所知,你不能用内置的csv解析器来做到这一点。如果解析器遇到错误(failFast 模式),您可以停止解析器,但不能进行注释。
但是,您可以使用自定义 csv 解析器执行此操作,该解析器可以在单次传递中处理数据。 除非我们想做一些聪明的类型内省,否则最简单的方法是创建一个帮助程序类来注释文件的结构:
case class CSVColumnDef(colPos: Int, colName: String, colType: String)
val columns = List(CSVColumnDef(0,"fname","String"),CSVColumnDef(1,"lname","String"),CSVColumnDef(2,"age", "Int"))
接下来,我们需要一些函数来 a) 拆分输入,b) 从拆分数据中提取数据,c) 检查行是否错误:
import scala.util.Try
def splitToSeq(delimiter: String) = udf[Seq[String],String](_.split(delimiter))
def extractColumnStr(i: Int) = udf[Option[String],Seq[String]](s => Try(Some(s(i))).getOrElse(None))
def extractColumnInt(i: Int) = udf[Option[Int],Seq[String]](s => Try(Some(s(i).toInt)).getOrElse(None))
def isRowBad(delimiter: String) = udf[Boolean,String](s => {
(s.split(delimiter).length != columns.length) || (s.split(delimiter).exists(_.length==0))
})
要使用这些,我们首先需要读取文本文件(因为我没有它,并且为了允许人们复制这个答案,我将创建一个 rdd):
val input = sc.parallelize(List(("will,smith,40"),("Dwayne,Nunn,36"),("Aniruddha,Sinha,"),("Maria,,22")))
input.take(5).foreach(println)
给定此输入,我们可以创建一个具有单列(原始行)的数据帧,并将拆分列添加到其中:
val delimiter = ","
val raw = "raw"
val delimited = "delimited"
val compDF = input.toDF(raw).withColumn(delimited, splitToSeq(delimiter)(col(raw)))
最后,我们可以提取之前定义的所有列,并检查行是否错误:
val df = columns.foldLeft(compDF){case (acc,column) => column.colType match {
case "Int" => acc.withColumn(column.colName, extractColumnInt(column.colPos)(col(delimited)))
case _ => acc.withColumn(column.colName, extractColumnStr(column.colPos)(col(delimited)))
}}.
withColumn("isMyRowBad", isRowBad(delimiter)(col(raw))).
drop(raw).drop(delimited)
df.show
df.printSchema
这个解决方案的好处是,spark 执行计划器足够聪明,可以将所有这些.withColumn
操作构建到数据的单个传递(map
)中,而不会进行零洗牌。烦人的是,它比使用一个漂亮的闪亮的 csv 库要多得多的开发工作,我们需要以某种方式定义列。如果你想更聪明一点,你可以从文件的第一行获取列名(提示:.mapPartitionsWithIndex
),然后把所有内容解析为字符串。我们也无法定义一个案例类来描述整个 DF,因为您有太多的列无法以这种方式处理解决方案。希望这有帮助...
这可以使用 udf 来完成。尽管 Ben Horsburgh 给出的答案绝对是绝妙的,但我们可以在不深入了解数据帧背后的内部架构的情况下做到这一点。
下面的代码可以给你一个想法
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* Created by vaijnath on 10/4/17.
*/
object DataQualityCheck extends App {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
val schema: StructType = StructType(List(
StructField("fname", StringType, nullable = true),
StructField("lname", StringType, nullable = true),
StructField("married", StringType, nullable = true)
))
val inputDataFrame: DataFrame = spark
.read
.schema(schema)
.option("header",false)
.option("delimiter",",")
.csv("hydrograph.engine.spark/testData/inputFiles/delimitedInputFile.txt")
//inputDataFrame.show()
def isBad(row:Row):Boolean={
row.anyNull
}
val simplefun=udf(isBad(_:Row))
val cols=struct(inputDataFrame.schema.fieldNames.map(e=> col(e)):_*)
// println(cols+"******************") //for debugging
val analysedDataFrame: DataFrame = inputDataFrame.withColumn("isRowBad", simplefun(cols))
analysedDataFrame.show
}
如果您遇到任何问题,请回复我。我相信这个解决方案是合适的,因为您似乎在寻找使用数据帧的代码。
谢谢。