在火花UDF中操纵数据框



我有一个UDF,可以过滤并从数据框架中选择值,但它符合"对象不是可序列化"错误。下面的详细信息。

假设我有一个dataframe df1,其列带有名称(" id"," y1"," y2"," y3"," y4"," y4"," y5"," y6"," y6"," y7"," y8"," y8"," y9"," y10"(。我希望根据另一个数据框架DF2的匹配" ID"one_answers"值"总和" Y"列的子集。我尝试了以下内容:

val y_list = ("Y1", "Y2", "Y3", "Y4", "Y5", "Y6", "Y7", "Y8", "Y9", "Y10").map(c => col(c))
def udf_test(ID: String, value: Int): Double = {
  df1.filter($"ID" === ID).select(y_list:_*).first.toSeq.toList.take(value).foldLeft(0.0)(_+_)
}
sqlContext.udf.register("udf_test", udf_test _)
val df_result = df2.withColumn("Result", callUDF("udf_test", $"ID", $"Value"))

这给了我形式的错误:

java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: Y1)

我抬起头来,意识到Spark列无法序列化。我想知道:

1(有什么方法可以在UDF中操纵数据框架吗?

2(如果没有,那么实现上述操作类型的最佳方法是什么?我的真实情况比这更复杂。它要求我根据大数据框中的某些列从多个小型数据框中选择值,并将值重新计算到大数据框架中。

我正在使用Spark 1.6.3。谢谢!

您无法在UDF中使用数据集操作。UDF只能在现有列上进行操纵并生成一个结果列。它不能过滤数据集或进行聚合,但可以在过滤器中使用。UDAF也可以汇总值。

相反,您可以使用.as[SomeCaseClass]从数据框架制作数据集,并使用过滤器中的正常,强烈键入的功能,映射,减少。

编辑:如果您想在smalldfs列表中使用每个小型DF加入BigDF,则可以这样做:

import org.apache.spark.sql.functions._
val bigDF = // some processing
val smallDFs = Seq(someSmallDF1, someSmallDF2)
val joined = smallDFs.foldLeft(bigDF)((acc, df) => acc.join(broadcast(df), "join_column"))

broadcast是添加广播提示的功能

1(不,您只能在UDFS

中使用普通的Scala代码

2(如果您正确解释了代码,则可以以:

来实现目标
df2
  .join(
    df1.select($"ID",y_list.foldLeft(lit(0))(_ + _).as("Result")),Seq("ID")
  )
import org.apache.spark.sql.functions._
val events = Seq (
(1,1,2,3,4),
(2,1,2,3,4),
(3,1,2,3,4),
(4,1,2,3,4),
(5,1,2,3,4)).toDF("ID","amt1","amt2","amt3","amt4")
var prev_amt5=0
var i=1
def getamt5value(ID:Int,amt1:Int,amt2:Int,amt3:Int,amt4:Int) : Int = {  
  if(i==1){
    i=i+1
    prev_amt5=0
  }else{
    i=i+1
  }
  if (ID == 0)
  {
    if(amt1==0)
    {
      val cur_amt5= 1
      prev_amt5=cur_amt5
      cur_amt5
    }else{
      val cur_amt5=1*(amt2+amt3)
      prev_amt5=cur_amt5
      cur_amt5
    }
  }else if (amt4==0 || (prev_amt5==0 & amt1==0)){
    val cur_amt5=0
    prev_amt5=cur_amt5
    cur_amt5
  }else{
    val cur_amt5=prev_amt5 +  amt2 + amt3 + amt4
    prev_amt5=cur_amt5
    cur_amt5
  }
}
val getamt5 = udf {(ID:Int,amt1:Int,amt2:Int,amt3:Int,amt4:Int) =>            
   getamt5value(ID,amt1,amt2,amt3,amt4)    
}
myDF.withColumn("amnt5", getamt5(myDF.col("ID"),myDF.col("amt1"),myDF.col("amt2"),myDF.col("amt3"),myDF.col("amt4"))).show()

最新更新