Spark: Task not serializable (Broadcast/RDD/SparkContext)



Spark中有很多关于Task is not serializable的问题。然而,这种情况似乎很特殊。

我创建了一个类:

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
    .persist()
  val sc = allEs.sparkContext
  val centroids = sc.broadcast(m.clusterCenters)
  [...]

类定义如下方法:

private def centroidDistances(v: Vector): Array[Double] = {
  centroids.value.map(c => (centroids.value.indexOf(c), Vectors.sqdist(v, c)))
    .sortBy(_._1)
    .map(_._2)
}

但是,当类被调用时,会抛出一个Task is not serializable异常。

奇怪的是,在类Neighbours的头一个微小的变化足以解决这个问题。我没有创建用于广播的val sc: SparkContext,只是内联了创建Spark上下文的代码:

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
  .setName("embeddings")
  .persist()
  val centroids = allEmbeddings.sparkContext(m.clusterCenters)
  [...]

我的问题是:第二种变体有什么不同?第一个出现了什么问题?直觉上,这应该只是语法糖,这是Spark中的错误吗?

我在Hadoop/Yarn集群上使用Spark 1.4.1

当你定义

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  ...
  val sc = allEmbeddings.sparkContext
  val centroids = sc.broadcast(m.clusterCenters)
  ...
}

你已经把sc变成了一个类变量,这意味着它可以从Neighbours的实例中访问,例如neighbours.sc。这意味着sc需要是可序列化的,而事实并非如此。

内联代码时,只有centroids的最终值需要序列化。centroids的类型是Broadcast,它是可序列化的。

相关内容

  • 没有找到相关文章

最新更新