Spark中有很多关于Task is not serializable
的问题。然而,这种情况似乎很特殊。
我创建了一个类:
class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
.persist()
val sc = allEs.sparkContext
val centroids = sc.broadcast(m.clusterCenters)
[...]
类定义如下方法:
private def centroidDistances(v: Vector): Array[Double] = {
centroids.value.map(c => (centroids.value.indexOf(c), Vectors.sqdist(v, c)))
.sortBy(_._1)
.map(_._2)
}
但是,当类被调用时,会抛出一个Task is not serializable
异常。
奇怪的是,在类Neighbours
的头一个微小的变化足以解决这个问题。我没有创建用于广播的val sc: SparkContext
,只是内联了创建Spark上下文的代码:
class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
.setName("embeddings")
.persist()
val centroids = allEmbeddings.sparkContext(m.clusterCenters)
[...]
我的问题是:第二种变体有什么不同?第一个出现了什么问题?直觉上,这应该只是语法糖,这是Spark中的错误吗?
我在Hadoop/Yarn集群上使用Spark 1.4.1
当你定义
class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
...
val sc = allEmbeddings.sparkContext
val centroids = sc.broadcast(m.clusterCenters)
...
}
你已经把sc
变成了一个类变量,这意味着它可以从Neighbours
的实例中访问,例如neighbours.sc
。这意味着sc
需要是可序列化的,而事实并非如此。
内联代码时,只有centroids
的最终值需要序列化。centroids
的类型是Broadcast
,它是可序列化的。