验证数据帧:
+---------+---------------------------+-------------------------+
|dataframe|Validation Checks |cols |
+---------+---------------------------+-------------------------+
|Attendee |isEmpty,IsNull |col1,col2,col3 |
+---------+---------------------------+-------------------------+
与会者数据帧:
col1 col2 col3
a1 a2 a3
b2 b3
c1 c2 c3
d1 d2 d3
预期结果数据帧:
col1 col2 col3 status
a1 a2 a3 clean
b2 b3 dirty
c1 c2 c3 clean
d1 d2 d3 clean
使用的代码:
var columns = df.columns //struct(df.columns map col: _*)
val colDF = df.select(col("dataframe"))
var tablename = colDF.head().toSeq
val checkDF = df.select(col("Validation Checks"))
val opsColDF = df.select(col("cols"))
val opsColumn = opsColDF.columns println("opsColumn :::" + opsColumn)
如果你有一个dataframe
+---------+-----------------+--------------+
|dataframe|Validation Checks|cols |
+---------+-----------------+--------------+
|Attendee |isEmpty,isNull |col1,col2,col3|
+---------+-----------------+--------------+
应使用列值进行 sql 查询。我使用udf
函数创建了另一列,使查询有效
import org.apache.spark.sql.functions._
def createQueryUdf = udf((table: String, logic: String, cols: String) => {
"select *, case when "+
cols.split(",")
.map(_.trim)
.map(x => logic.split(",")
.map(_.trim.toLowerCase)
.map{
case y if (y == "isempty") => s"$x like ''"
case y => s"$y($x)"
}.mkString(" or "))
.mkString(" or ") +
s" then 'dirty' else 'clean' end as status from $table"
})
val dataframeWithQuery = df.withColumn("query", createQueryUdf(col("dataframe"), col("Validation Checks"), col("cols")))
所以dataframeWithQuery
会
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|dataframe|Validation Checks|cols |query |
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Attendee |isEmpty,isNull |col1,col2,col3|select *, case when col1 like '' or isnull(col1) or col2 like '' or isnull(col2) or col3 like '' or isnull(col3) then 'dirty' else 'clean' end as status from Attendee|
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
现在,您可以选择要命中数据帧的有效查询,但在此之前,数据帧全部注册为
attendee.createOrReplaceTempView("Attendee")
然后,您只需collect
查询列和循环即可应用查询语句
val queryArray = dataframeWithQuery.select("query").collect.map(_.getAs[String]("query"))
for(query <- queryArray){
spark.sql(query).show(false)
}
应该给你
+----+----+----+------+
|col1|col2|col3|status|
+----+----+----+------+
|a1 |a2 |a3 |clean |
| |b2 |b3 |dirty |
|c1 |c2 |c3 |clean |
|d1 |d2 |d3 |clean |
+----+----+----+------+
到现在为止,您应该知道如何进一步进行。我希望答案对您有所帮助
package com.incedo.pharma
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, struct}
object objValidation {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("columncheck")
.master("local[*]")
.getOrCreate()
val df = spark.read.format("com.databricks.spark.csv")
.option("header", true)
.option("delimiter", ",")
.load("tablecolcheck.csv")
println("validation dataframe :::::"+df.show())
var AttendeeDF = df
var tableNameArray = df.select(col("tablename")).collect().toSeq
val dataframeWithQuery = df.withColumn("query", createQueryUdf(df("tablename"), df("Validation Checks"), df("cols")))
println("dataframeWithQuery ---------------------"+dataframeWithQuery.show(false))
tableNameArray.foreach(tableArray => {
AttendeeDF = spark.read.format("com.databricks.spark.csv")
.option("header", true)
.option("delimiter", ",")
//.load("AttendeeTable.csv")
.load(tableArray.get(0)+".csv")
println("AttendeeDF ::::"+AttendeeDF.show(false))
AttendeeDF.createOrReplaceTempView("AttendeeTable")
var queryArray = dataframeWithQuery.select("query").collect.map(_.getAs[String]("query"))
println("queryArray ----"+queryArray.toSeq)
for(query <- queryArray){
spark.sql(query).show(false)
}
})
}
def createQueryUdf = udf((table: String, logic: String, cols: String) => {
"select *, case when "+
cols.split(",")
.map(_.trim)
.map(x => logic.split(",")
.map(_.trim.toLowerCase)
.map{
case y if (y == "isempty") => s"$x like ''"
case y if (y == "gt>3") => s"length($x) > 3"
case y => s"$y($x)"
}.mkString(" or "))
.mkString(" or ") +
s" then 'dirty' else 'clean' end as status from $table"
})
}