我有一个数据框架,其中有一列名为"查询";有select语句存在。想要执行这个查询并创建一个包含TempView实际结果的新列。
+--------------+-----------+-----+----------------------------------------+
|DIFFCOLUMNNAME|DATATYPE |ISSUE|QUERY |
+--------------+-----------+-----+----------------------------------------+
|Firstname |StringType |YES |Select Firstname from TempView limit 1 |
|LastName |StringType |NO |Select LastName from TempView limit 1 |
|Designation |StringType |YES |Select Designation from TempView limit 1|
|Salary |IntegerType|YES |Select Salary from TempView limit 1 |
+--------------+-----------+-----+----------------------------------------+
获取错误为类型不匹配,需要的字符串找到列。我需要在这里使用UDF吗?但不知道如何书写和使用。请建议
DF.withColumn("QueryResult", spark.sql(col("QUERY")))
TempView是临时视图,我已经创建了所有必需的列。预期的最终Dataframe将是这样的,并添加新列QUERYRESULT.
+--------------+-----------+-----+----------------------------------------+------------+
|DIFFCOLUMNNAME|DATATYPE |ISSUE|QUERY | QUERY RESULT
+--------------+-----------+-----+----------------------------------------+------------+
|Firstname |StringType |YES |Select Firstname from TempView limit 1 | Bunny |
|LastName |StringType |NO |Select LastName from TempView limit 1 | Gummy |
|Designation |StringType |YES |Select Designation from TempView limit 1| Developer |
|Salary |IntegerType|YES |Select Salary from TempView limit 1 | 100 |
+--------------+-----------+-----+----------------------------------------+------------+
如果查询的数量有限,您可以收集它们,执行每个查询,并与原始查询数据框连接(Kieran的回答更快,但我的回答有示例):
val queriesDF = Seq(
("Firstname", "StringType", "YES", "Select Firstname from TempView limit 1 "),
("LastName", "StringType", "NO", "Select LastName from TempView limit 1 "),
("Designation", "StringType", "YES", "Select Designation from TempView limit 1"),
("Salary", "IntegerType", "YES", "Select Salary from TempView limit 1 ")
).toDF(
"DIFFCOLUMNNAME", "DATATYPE", "ISSUE", "QUERY"
)
val data = Seq(
("Bunny", "Gummy", "Developer", 100)
)
.toDF("Firstname", "LastName", "Designation", "Salary")
data.createOrReplaceTempView("TempView")
// get all queries and evaluate results
val queries = queriesDF.select("QUERY").distinct().as(Encoders.STRING).collect().toSeq
val queryResults = queries.map(q => (q, spark.sql(q).as(Encoders.STRING).first()))
val queryResultsDF = queryResults.toDF("QUERY", "QUERY RESULT")
// Join original queries and results
queriesDF.alias("queriesDF")
.join(queryResultsDF, Seq("QUERY"))
.select("queriesDF.*", "QUERY RESULT")
输出:
+----------------------------------------+--------------+-----------+-----+------------+
|QUERY |DIFFCOLUMNNAME|DATATYPE |ISSUE|QUERY RESULT|
+----------------------------------------+--------------+-----------+-----+------------+
|Select Firstname from TempView limit 1 |Firstname |StringType |YES |Bunny |
|Select LastName from TempView limit 1 |LastName |StringType |NO |Gummy |
|Select Designation from TempView limit 1|Designation |StringType |YES |Developer |
|Select Salary from TempView limit 1 |Salary |IntegerType|YES |100 |
+----------------------------------------+--------------+-----------+-----+------------+
假设您没有那么多'查询行',只需使用df.collect()将结果收集到驱动程序,然后使用纯Scala映射查询。