在withColumn子句中执行Spark sql查询是Spark Scala



我有一个数据框架,其中有一列名为"查询";有select语句存在。想要执行这个查询并创建一个包含TempView实际结果的新列。

+--------------+-----------+-----+----------------------------------------+
|DIFFCOLUMNNAME|DATATYPE   |ISSUE|QUERY                                   |
+--------------+-----------+-----+----------------------------------------+
|Firstname     |StringType |YES  |Select Firstname from TempView  limit 1 |
|LastName      |StringType |NO   |Select LastName from TempView  limit 1  |
|Designation   |StringType |YES  |Select Designation from TempView limit 1|
|Salary        |IntegerType|YES  |Select Salary from TempView    limit 1  |
+--------------+-----------+-----+----------------------------------------+

获取错误为类型不匹配,需要的字符串找到列。我需要在这里使用UDF吗?但不知道如何书写和使用。请建议

DF.withColumn("QueryResult", spark.sql(col("QUERY")))

TempView是临时视图,我已经创建了所有必需的列。预期的最终Dataframe将是这样的,并添加新列QUERYRESULT.

+--------------+-----------+-----+----------------------------------------+------------+
|DIFFCOLUMNNAME|DATATYPE   |ISSUE|QUERY                                   | QUERY RESULT
+--------------+-----------+-----+----------------------------------------+------------+
|Firstname     |StringType |YES  |Select Firstname from TempView  limit 1 | Bunny      |
|LastName      |StringType |NO   |Select LastName from TempView  limit 1  | Gummy      |
|Designation   |StringType |YES  |Select Designation from TempView limit 1| Developer  |
|Salary        |IntegerType|YES  |Select Salary from TempView    limit 1  | 100        |
+--------------+-----------+-----+----------------------------------------+------------+

如果查询的数量有限,您可以收集它们,执行每个查询,并与原始查询数据框连接(Kieran的回答更快,但我的回答有示例):

val queriesDF = Seq(
("Firstname", "StringType", "YES", "Select Firstname from TempView  limit 1 "),
("LastName", "StringType", "NO", "Select LastName from TempView  limit 1 "),
("Designation", "StringType", "YES", "Select Designation from TempView limit 1"),
("Salary", "IntegerType", "YES", "Select Salary from TempView limit 1 ")
).toDF(
"DIFFCOLUMNNAME", "DATATYPE", "ISSUE", "QUERY"
)
val data = Seq(
("Bunny", "Gummy", "Developer", 100)
)
.toDF("Firstname", "LastName", "Designation", "Salary")
data.createOrReplaceTempView("TempView")
// get all queries and evaluate results
val queries = queriesDF.select("QUERY").distinct().as(Encoders.STRING).collect().toSeq
val queryResults = queries.map(q => (q, spark.sql(q).as(Encoders.STRING).first()))
val queryResultsDF = queryResults.toDF("QUERY", "QUERY RESULT")
// Join original queries and results
queriesDF.alias("queriesDF")
.join(queryResultsDF, Seq("QUERY"))
.select("queriesDF.*", "QUERY RESULT")

输出:

+----------------------------------------+--------------+-----------+-----+------------+
|QUERY                                   |DIFFCOLUMNNAME|DATATYPE   |ISSUE|QUERY RESULT|
+----------------------------------------+--------------+-----------+-----+------------+
|Select Firstname from TempView  limit 1 |Firstname     |StringType |YES  |Bunny       |
|Select LastName from TempView  limit 1  |LastName      |StringType |NO   |Gummy       |
|Select Designation from TempView limit 1|Designation   |StringType |YES  |Developer   |
|Select Salary from TempView limit 1     |Salary        |IntegerType|YES  |100         |
+----------------------------------------+--------------+-----------+-----+------------+

假设您没有那么多'查询行',只需使用df.collect()将结果收集到驱动程序,然后使用纯Scala映射查询。

相关内容

  • 没有找到相关文章

最新更新