如何在spark-sql中基于条件更新行



我正在研究spark-sql数据准备。

我面临的问题是得到sql查询的结果后。如何根据If-then-else条件更新行?

我在做什么

  val table_join = sqlContext.sql(""" SELECT a.*,b.col as someCol
  from table1 a LEFT JOIN table2 b 
  on a.ID=b.ID """)
table_join.registerTempTable("Table_join")

现在,当我最终加入表是在df格式。我应该如何更新行?

//Final filtering operation
val final_filtered_table = table_join.map{ case record=> 
  if(record.getAs[String]("col1") == "Y" && record.getAs[String]("col2") == "") record.getAs[String]("col2")="UNKNOWN" 
  else if (record.getAs[String]("col1") == "N") record("col1")=""
  else record
}

在上面的映射中,if语法工作正常,但是当我应用更新条件来修改时,它给了我错误。

但是为什么下面的查询是工作的

 if(record.getAs[String]("col1") == "Y" && record.getAs[String]("col2") == "") "UNKNOWN" 

但是当我把"UNKNOWN"变成record.getAs[String]("col2")="UNKNOWN"的时候它给了我在.getAs处的错误

我尝试的另一种方法是:

val final_filtered_sql = table_join.map{row => 
  if(row.getString(6) == "Y" && row.getString(33) == "") row.getString(6) == "UNKNOWN" 
  else if(row.getString(6) == "N") row.getString(6) == ""
  else row
}

这是工作,但这是正确的方法,因为我不应该用他们的no来调用列,而是用他们的名字。我应该遵循什么方法来获取列的名称,然后更新??

请帮我解决这个问题。我应该使用什么语法来根据spark-sql

中的dataframe条件更新行

record.getAs[String]("col2")="UNKNOWN"将无法工作,因为record.getAs[String](NAME)将返回没有=方法的String,并且为字符串分配新值没有意义。

DataFrame记录没有任何setter方法,因为DataFrames是基于RDD的不可变集合,这意味着你不能改变它们的状态,这就是你在这里要做的。

一种方法是在table_join上使用selectExpr创建新的DataFrame,并使用SQLif/else逻辑放在那里。

相关内容

  • 没有找到相关文章

最新更新