Scala Spark数据帧:即使有广播变量,任务也不可序列化异常



这是有效的(df:dataframe)

val filteredRdd = df.rdd.zipWithIndex.collect { case (r, i) if i >= 10 => r }

这不是

val start=10
val filteredRdd = df.rdd.zipWithIndex.collect { case (r, i) if i >= start => r }

我试着使用广播变量,但即使这样也不起作用

 val start=sc.broadcast(1)
 val filteredRdd = df.rdd.zipWithIndex.collect { case (r, i) if i >= start.value => r }

我收到任务不可序列化异常。有人能解释为什么即使有广播变量也会失败吗。

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$collect$2.apply(RDD.scala:959)
at org.apache.spark.rdd.RDD$$anonfun$collect$2.apply(RDD.scala:958)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.collect(RDD.scala:958)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:172)
at $iwC

您正在使用的基本构造看起来很坚固。下面是执行操作的类似代码片段。请注意,它使用broadcast,并在map方法中使用广播值,这与您的代码类似。

scala> val dat = sc.parallelize(List(1,2,3))
dat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val br = sc.broadcast(10)
br: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(2)
scala> dat.map(br.value * _)
res2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at map at <console>:29
scala> res2.collect
res3: Array[Int] = Array(10, 20, 30)

因此,这可能有助于你验证你的一般方法。

我怀疑您的问题与脚本中的其他变量有关。试着在新的火花壳会话中首先剥离所有内容,并通过消除过程找出罪魁祸首。

相关内容

  • 没有找到相关文章

最新更新