对 Spark SQL 中的某些列应用验证



验证数据帧:

+---------+---------------------------+-------------------------+
|dataframe|Validation Checks          |cols                     |
+---------+---------------------------+-------------------------+
|Attendee |isEmpty,IsNull             |col1,col2,col3           |
+---------+---------------------------+-------------------------+

与会者数据帧:

col1    col2    col3
a1       a2     a3
b2     b3
c1       c2     c3
d1       d2     d3

预期结果数据帧:

col1    col2    col3   status
a1       a2     a3      clean
b2     b3      dirty  
c1       c2     c3      clean
d1       d2     d3      clean

使用的代码:

var columns = df.columns //struct(df.columns map col: _*) 
val colDF = df.select(col("dataframe")) 
var tablename = colDF.head().toSeq 
val checkDF = df.select(col("Validation Checks")) 
val opsColDF = df.select(col("cols")) 
val opsColumn = opsColDF.columns println("opsColumn :::" + opsColumn) 

如果你有一个dataframe

+---------+-----------------+--------------+
|dataframe|Validation Checks|cols          |
+---------+-----------------+--------------+
|Attendee |isEmpty,isNull   |col1,col2,col3|
+---------+-----------------+--------------+

使用列值进行 sql 查询。我使用udf函数创建了另一列,使查询有效

import org.apache.spark.sql.functions._
def createQueryUdf = udf((table: String, logic: String, cols: String) => {
"select *, case when "+
cols.split(",")
.map(_.trim)
.map(x => logic.split(",")
.map(_.trim.toLowerCase)
.map{
case y if (y == "isempty") => s"$x like ''"
case y => s"$y($x)"
}.mkString(" or "))
.mkString(" or ") +
s" then 'dirty' else 'clean' end as status from $table"
})
val dataframeWithQuery = df.withColumn("query", createQueryUdf(col("dataframe"), col("Validation Checks"), col("cols")))

所以dataframeWithQuery

+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|dataframe|Validation Checks|cols          |query                                                                                                                                                                 |
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Attendee |isEmpty,isNull   |col1,col2,col3|select *, case when col1 like '' or isnull(col1) or col2 like '' or isnull(col2) or col3 like '' or isnull(col3) then 'dirty' else 'clean' end as status from Attendee|
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+

现在,您可以选择要命中数据帧的有效查询,但在此之前,数据帧全部注册

attendee.createOrReplaceTempView("Attendee")

然后,您只需collect查询列循环即可应用查询语句

val queryArray = dataframeWithQuery.select("query").collect.map(_.getAs[String]("query"))
for(query <- queryArray){
spark.sql(query).show(false)
}

应该给你

+----+----+----+------+
|col1|col2|col3|status|
+----+----+----+------+
|a1  |a2  |a3  |clean |
|    |b2  |b3  |dirty |
|c1  |c2  |c3  |clean |
|d1  |d2  |d3  |clean |
+----+----+----+------+

到现在为止,您应该知道如何进一步进行。我希望答案对您有所帮助

package com.incedo.pharma
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, struct}
object objValidation {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("columncheck")
.master("local[*]")
.getOrCreate()
val df = spark.read.format("com.databricks.spark.csv")
.option("header", true)
.option("delimiter", ",")
.load("tablecolcheck.csv")
println("validation dataframe :::::"+df.show())
var AttendeeDF = df
var tableNameArray = df.select(col("tablename")).collect().toSeq
val dataframeWithQuery = df.withColumn("query", createQueryUdf(df("tablename"), df("Validation Checks"), df("cols")))
println("dataframeWithQuery ---------------------"+dataframeWithQuery.show(false))
tableNameArray.foreach(tableArray => {
AttendeeDF = spark.read.format("com.databricks.spark.csv")
.option("header", true)
.option("delimiter", ",")
//.load("AttendeeTable.csv")
.load(tableArray.get(0)+".csv")
println("AttendeeDF ::::"+AttendeeDF.show(false))                    
AttendeeDF.createOrReplaceTempView("AttendeeTable")
var queryArray = dataframeWithQuery.select("query").collect.map(_.getAs[String]("query"))
println("queryArray ----"+queryArray.toSeq)
for(query <- queryArray){
spark.sql(query).show(false)
}
})
}
def createQueryUdf = udf((table: String, logic: String, cols: String) => {
"select *, case when "+
cols.split(",")
.map(_.trim)
.map(x => logic.split(",")
.map(_.trim.toLowerCase)
.map{
case y if (y == "isempty") => s"$x like ''"
case y if (y == "gt>3") => s"length($x) > 3"
case y => s"$y($x)"
}.mkString(" or "))
.mkString(" or ") +
s" then 'dirty' else 'clean' end as status from $table"
})
}

最新更新