我有一个scala列表对象,它具有与spark数据框架列相关的所有操作的递归定义。例如,
操作(C1 - C2) + ((C3 - C4)- (C5 - c6))
是由下一个scala列表:
List("addition", List("substraction",List("C1","C2")),
List("substraction",
List("substraction",List("C3","C4")),
List("substraction"), List("C5","C6"))
)
其中"C1",...,"C5"
为spark dataframes列的名称。
我想定义一个递归scala函数,它给我最后的列结果。
有人知道怎么做吗?
您定义操作的方式非常奇怪。可以在列表中封装列名操作数,但不能封装复杂操作数。因此你可以有两个或三个元素列表。你如何定义(A + (B-C))
?我会从修复这个问题开始,然后像这样写你的操作(每个列表3个元素):
val list = List("addition",
List("substraction","C1","C2"),
List("substraction",
List("substraction","C3","C4"),
List("substraction", "C5","C6")
)
)
或者像这样(每个列表2个元素):
val list = List("addition", List(
List("substraction", List("C1","C2")),
List("substraction", List(
List("substraction", List("C3","C4")),
List("substraction", List("C5","C6"))
)))
)
第二个版本要冗长得多,让我们选择第一个版本并编写递归函数:
def operation_to_col(operation : Any) : Column = {
operation match {
case x : String => col(x)
case List("addition", s1 : Any, s2 : Any) =>
operation_to_col(s1) + operation_to_col(s2)
case List("substraction", s1 : Any, s2 : Any) =>
operation_to_col(s1) + operation_to_col(s2)
}
}
首先,我要更改操作的定义。例如,
操作(C1 - C2) + ((C3 - C4)- (C5 - c6))
由下一个scala列表定义:
val list = List("addition",
List("substraction","C1","C2"),
List("substraction",
List("substraction","C3","C4"),
List("substraction", "C5","C6")
))
我将为示例创建一个数据框架:
val data = Seq((1000, 1, 2,3,4,5), (2000,1,2,3,4,5), (3000,1,2,3,4,5))
val rdd = spark.sparkContext.parallelize(data)
val df = rdd.toDF("C1","C2","C3","C4","C5","C6")
允许的操作列表为:
val operations=List("addition","subtraction","multiplication","division")
我创建了下一个Map对象来关联操作和它们的符号:
val oprSimbols:Map[String,String] = Map("addition"->"+", "substraction"-> "-", "multiplication"->"*","division"->"/")
最后,我定义了解决问题的函数:
def operation_to_col(df: DataFrame,oprSimbols: Map[String,String],
operations:List[String], list : Any) : DataFrame = {
list match {
case x if operations.contains(x.toString) => df.select(col(x.toString))
case List(oprName:String,x:String, y:String) =>{
val sym = oprSimbols(oprName)
val exprOpr = List(x,sym,y).mkString(" ")
df.selectExpr(exprOpr)}
case List(oprName:String, s1 : Any, s2 : Any) =>{
val df1 = operation_to_col(df,oprSimbols,operations,s1)
val df2 = operation_to_col(df,oprSimbols,operations,s2)
val sym = oprSimbols(oprName)
val exprOpr = List(df1.columns(0),sym,df2.columns(0)).mkString(" ")
df.selectExpr(exprOpr)
}
}}
我们可以检查一下:
operation_to_col(df,oprSimbols, operations, list )