我想以编程方式给出一定数量的字段,对于某些字段,选择一列并将该字段传递给另一个函数,该函数将返回字符串,字符串的案例类。 到目前为止,我有
val myList = Seq(("a", "b", "c", "d"), ("aa", "bb", "cc","dd"))
val df = myList.toDF("col1","col2","col3","col4")
val fields= "col1,col2"
val myDF = df.select(df.columns.map(c => if (fields.contains(c)) { df.col(s"$c") && someUDFThatReturnsAStructTypeOfStringAndString(df.col(s"$c")).alias(s"${c}_processed") } else { df.col(s"$c") }): _*)
现在这给了我一个例外
org.apache.spark.sql.AnalysisException: cannot resolve '(col1 AND UDF(col1))' due to data type mismatch: differing types in '(col1 AND UDF(col1))' (string and struct< STRING1:string,STRING2:string > )
我要选择
Col1 | <Col1。字符串> |Col2 |<col2。字符串> |COL3 |科尔4
"A" | <"A1"、"A2"> |"b" |<"B1"、"B2"> |"C" |"d"
我最终使用了df.selectExpr并将一堆表达式捆绑在一起。
import spark.implicits._
val fields = "col1,col2".split(",")
val exprToSelect = df.columns.filter(c => fields.contains(c)).map(c => s"someUDFThatReturnsAStructTypeOfStringAndString(${c}) as ${c}_parsed") ++ df.columns
val exprToFilter = df.columns.filter(c => fields.contains(c)).map(c => s"length(${c}_parsed.String1) > 1").reduce(_ + " OR " + _) //error
val exprToFilter2 = df.columns.filter(c => fields.contains(c)).map(c => s"(length(${c}_parsed.String1) < 1)").reduce(_ + " AND " + _) //valid
val exprToSelectValid = df.columns.filter(c => fields.contains(c)).map(c => s"${c}_parsed.String2 as ${c}") ++ df.columns.filterNot(c => fields.contains(c)) //valid
val exprToSelectInValid = Array("concat(" + df.columns.filter(c => fields.contains(c)).map(c => s"${c}_parsed.String1").mkString(", ") + ") as String1") ++ df.columns
val parsedDF = df.select(exprToSelect.map { c => expr(s"$c")}: _ *)
val validDF = parsedDF.filter(exprToFilter2)
.select(exprToSelectValid.map { c => expr(s"$c")}: _ *)
val errorDF = parsedDF.filter(exprToFilter)
.select(exprToSelectInValid.map { c => expr(s"$c")}: _ *)