我有一个UDF,可以过滤并从数据框架中选择值,但它符合"对象不是可序列化"错误。下面的详细信息。
假设我有一个dataframe df1,其列带有名称(" id"," y1"," y2"," y3"," y4"," y4"," y5"," y6"," y6"," y7"," y8"," y8"," y9"," y10"(。我希望根据另一个数据框架DF2的匹配" ID"one_answers"值"总和" Y"列的子集。我尝试了以下内容:
val y_list = ("Y1", "Y2", "Y3", "Y4", "Y5", "Y6", "Y7", "Y8", "Y9", "Y10").map(c => col(c))
def udf_test(ID: String, value: Int): Double = {
df1.filter($"ID" === ID).select(y_list:_*).first.toSeq.toList.take(value).foldLeft(0.0)(_+_)
}
sqlContext.udf.register("udf_test", udf_test _)
val df_result = df2.withColumn("Result", callUDF("udf_test", $"ID", $"Value"))
这给了我形式的错误:
java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: Y1)
我抬起头来,意识到Spark列无法序列化。我想知道:
1(有什么方法可以在UDF中操纵数据框架吗?
2(如果没有,那么实现上述操作类型的最佳方法是什么?我的真实情况比这更复杂。它要求我根据大数据框中的某些列从多个小型数据框中选择值,并将值重新计算到大数据框架中。
我正在使用Spark 1.6.3。谢谢!
您无法在UDF中使用数据集操作。UDF只能在现有列上进行操纵并生成一个结果列。它不能过滤数据集或进行聚合,但可以在过滤器中使用。UDAF也可以汇总值。
相反,您可以使用.as[SomeCaseClass]
从数据框架制作数据集,并使用过滤器中的正常,强烈键入的功能,映射,减少。
编辑:如果您想在smalldfs列表中使用每个小型DF加入BigDF,则可以这样做:
import org.apache.spark.sql.functions._
val bigDF = // some processing
val smallDFs = Seq(someSmallDF1, someSmallDF2)
val joined = smallDFs.foldLeft(bigDF)((acc, df) => acc.join(broadcast(df), "join_column"))
broadcast
是添加广播提示的功能
1(不,您只能在UDFS
中使用普通的Scala代码2(如果您正确解释了代码,则可以以:
来实现目标df2
.join(
df1.select($"ID",y_list.foldLeft(lit(0))(_ + _).as("Result")),Seq("ID")
)
import org.apache.spark.sql.functions._
val events = Seq (
(1,1,2,3,4),
(2,1,2,3,4),
(3,1,2,3,4),
(4,1,2,3,4),
(5,1,2,3,4)).toDF("ID","amt1","amt2","amt3","amt4")
var prev_amt5=0
var i=1
def getamt5value(ID:Int,amt1:Int,amt2:Int,amt3:Int,amt4:Int) : Int = {
if(i==1){
i=i+1
prev_amt5=0
}else{
i=i+1
}
if (ID == 0)
{
if(amt1==0)
{
val cur_amt5= 1
prev_amt5=cur_amt5
cur_amt5
}else{
val cur_amt5=1*(amt2+amt3)
prev_amt5=cur_amt5
cur_amt5
}
}else if (amt4==0 || (prev_amt5==0 & amt1==0)){
val cur_amt5=0
prev_amt5=cur_amt5
cur_amt5
}else{
val cur_amt5=prev_amt5 + amt2 + amt3 + amt4
prev_amt5=cur_amt5
cur_amt5
}
}
val getamt5 = udf {(ID:Int,amt1:Int,amt2:Int,amt3:Int,amt4:Int) =>
getamt5value(ID,amt1,amt2,amt3,amt4)
}
myDF.withColumn("amnt5", getamt5(myDF.col("ID"),myDF.col("amt1"),myDF.col("amt2"),myDF.col("amt3"),myDF.col("amt4"))).show()