Spark UDF - 任务不可序列化异常



我正在尝试使用以下 scala 代码创建 UDF

lazy val formattedDF = df.withColumn("result_col", validateudf(df("id")))
val validateudf = udf((id: Int) => {
  if(id == 1){
     "ID IS EQUAL TO 1"
  } 
  else if(id > 1){
    validateId(id)
  }
  else{
    "NO VALID RECORDS"
  }
})
def validateId(id:Int) : String = {
   if (id > 2) {
     "ID IS GREATER THAN 2"
   }
   else {
     "VALID RECORDS"
   }
 }

当我运行此代码时,我遇到任务不可序列化异常。

有什么想法吗?谢谢。

udf被视为

一个黑匣子,它要求对传递的列进行序列化和反序列化,因此当您有内置函数的替代方案时,不建议使用 udf

使用 withColumn 调用udf函数很好,但您已经从导致问题的udf函数内部调用了另一个validateId函数。

我建议您根本不使用udf函数,因为您只需使用内置函数即可满足要求when

import org.apache.spark.sql.functions._
val formattedDF2 = df.withColumn("result_col", when($"id" === 1, lit("ID IS EQUAL TO 1")).otherwise(when($"id" > 2, lit("ID IS GREATER THAN 2")).otherwise(when($"id" > 1, lit("VALID RECORDS")).otherwise(lit("NO VALID RECORDS")))))

相关内容

  • 没有找到相关文章

最新更新