我正在研究spark-sql数据准备。
我面临的问题是得到sql查询的结果后。如何根据If-then-else条件更新行?
我在做什么
val table_join = sqlContext.sql(""" SELECT a.*,b.col as someCol
from table1 a LEFT JOIN table2 b
on a.ID=b.ID """)
table_join.registerTempTable("Table_join")
现在,当我最终加入表是在df格式。我应该如何更新行?
//Final filtering operation
val final_filtered_table = table_join.map{ case record=>
if(record.getAs[String]("col1") == "Y" && record.getAs[String]("col2") == "") record.getAs[String]("col2")="UNKNOWN"
else if (record.getAs[String]("col1") == "N") record("col1")=""
else record
}
在上面的映射中,if语法工作正常,但是当我应用更新条件来修改时,它给了我错误。
但是为什么下面的查询是工作的
if(record.getAs[String]("col1") == "Y" && record.getAs[String]("col2") == "") "UNKNOWN"
但是当我把"UNKNOWN"
变成record.getAs[String]("col2")="UNKNOWN"
的时候它给了我在.getAs
处的错误
我尝试的另一种方法是:
val final_filtered_sql = table_join.map{row =>
if(row.getString(6) == "Y" && row.getString(33) == "") row.getString(6) == "UNKNOWN"
else if(row.getString(6) == "N") row.getString(6) == ""
else row
}
这是工作,但这是正确的方法,因为我不应该用他们的no来调用列,而是用他们的名字。我应该遵循什么方法来获取列的名称,然后更新??
请帮我解决这个问题。我应该使用什么语法来根据spark-sql
dataframe
条件更新行 record.getAs[String]("col2")="UNKNOWN"
将无法工作,因为record.getAs[String](NAME)
将返回没有=
方法的String
,并且为字符串分配新值没有意义。
DataFrame
记录没有任何setter方法,因为DataFrames
是基于RDD
的不可变集合,这意味着你不能改变它们的状态,这就是你在这里要做的。
table_join
上使用selectExpr
创建新的DataFrame
,并使用SQL
将if/else
逻辑放在那里。