我使用的是Apache Zeppelin笔记本。所以spark基本上是以交互模式运行的。我不能在这里使用闭包变量,因为齐柏林在试图序列化整个段落(更大的闭包)时会抛出org.apache.spark.SparkException: Task not serializable
。
如果没有闭包方法,我唯一的选择就是将map作为列传递给UDF。
我有一个从parted RDD收集的以下映射:
final val idxMap = idxMapRdd.collectAsMap
在这里的一个spark转换中使用:
def labelStr(predictions: WrappedArray[Double], idxMap: Map[Double, String]): Array[String] = {
predictions.array.map(idxMap.getOrElse(_, "Other"))
}
@transient val predictionStrUDF = udf { (predictions: WrappedArray[Double], idxMap: Map[Double, String]) => labelStr(predictions)}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF(col("predictions"), lit(idxMap)))
但是使用lit(idxMap)
语句,我得到了以下错误:
java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.HashMap$HashTrieMap
所以我试着从下面创建列:
val colmap = map(idxMapArr.map(lit _): _*)
但是出现以下错误:
<console>:139: error: type mismatch;
found : Iterable[org.apache.spark.sql.Column]
required: Seq[org.apache.spark.sql.Column]
val colmap = map(idxMapArr.map(lit _): _*)
闭包方法(为了完整性):
def predictionStrUDF2( idxMapArr: scala.collection.Map[Double,String]) = {
udf((predictions: WrappedArray[Double] ) => labelStr(predictions, idxMapArr))
}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF2(idxMapArr)(col("predictions")))
编译,但当我做cvmlPredictionsStr.show
我得到以下。我想这是由于齐柏林飞艇的互动性
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
... 62 elided
Caused by: java.io.NotSerializableException: com.github.fommil.netlib.F2jBLAS
Serialization stack:
- object not serializable (class: com.github.fommil.netlib.F2jBLAS, value: com.github.fommil.netlib.F2jBLAS@294770d3)
- field (class: org.apache.spark.ml.tuning.CrossValidator, name: f2jBLAS, type: class com.github.fommil.netlib.F2jBLAS)
- object (class org.apache.spark.ml.tuning.CrossValidator, cv_891fd6b7d95f)
- field (class: $iw, name: crossValidator, type: class org.apache.spark.ml.tuning.CrossValidator)
- object (class $iw, $iw@556a6aed)
- field (class: $iw, name: $iw, type: class $iw)
问题的标题是关于Spark udf的,但这里真正的问题似乎是如何避免一些交互式环境所表现出的闭包序列化问题。
从你对问题的描述来看,如果直接在你的一个笔记本单元格中执行下面的操作,听起来就不工作了:
val x = 5
sc.parallelize(1 to 10).filter(_ > x).collect()
这可能是因为x是cell对象的类成员;当lambda捕获x时,它尝试序列化整个单元格对象。cell对象是不可序列化的,结果是一个混乱的异常。使用包装器对象可以避免这个问题。注意,这可能是一种更圆滑的方式来声明这个包装器(也许仅仅在大括号内嵌套就足够了)。
object Wrapper {
def f() {
val x = 5
sc.parallelize(1 to 10).filter(_ > x).collect()
}
}
Wrapper.f()
在解决了这个问题之后,你可能还有问题,但是目前这个问题涉及了太多不同的子主题。关于闭包序列化问题的另一个解释可以在这里找到。