Spark 序列化 (?) 具有特征怪异和 1.6.0 和 2.1.1 之间的差异



我知道有很多关于这个问题的问题,但我无法找到关于究竟需要序列化的内容(以及何时序列化)的系统解释......以及如何验证此要求。

考虑一下:

class Baz
trait Bar { val baz = new Baz; def bar(i: Int) = baz }
case object Foo extends Bar { def foo = sc.parallelize(1 to 1).map(bar).collect }
Foo.foo

这有效,并返回Array(null)这对任何人都有意义吗???

如果我val更改为lazy val,那么它会停止工作并抛出NotSerializableException,这是有道理的 - 它在远程端初始化baz,然后无法将其发回。 但是为什么在第一种情况下它很乐意用null代替它呢???

如果我写它,几乎,我能想到的任何其他方式 - 例如bar定义从特征移动到对象,或者用_ => baz替换bar调用 - 它也停止工作,并抱怨Task is not serializable.

返回在 trait 中定义的 val 的方法是什么,它只是将其编写为null?有什么想法吗?

更新上述行为发生在带有 Spark 2.1.1 的 scala 2.11 上。 Scala 2.10(火花 1.6.0)确实抛出了一个异常,抱怨Baz不可序列化......所以,这似乎是一种回归。

我还注意到在 Spark 1.6.0 上,这样的东西工作正常:

object Foo { def foo = sc.parallelize(1 to 1).map(bar).collect; def bar(i: Int) = i+1 } 
Foo.foo

但是在 Spark 2.1.1 上,它抱怨Foo不可序列化。为什么? 显然,序列化 lambda 也想序列化Foo,这有点道理......除了它确实以某种方式在 1.6.0 中工作,即使我让 labda 实际上引用了Foo中的其他内容:

object Foo { 
var stuff = 10 
def foo = sc.parallelize(1 to 1).map(bar).collect
def bar(i: Int) = { stuff += 1; i+1 }
} 
Foo.foo
Foo.stuff

这在 1.6.0 中工作正常,但在 2.1.1 中则不然。

所以,这里的一个问题是它在 1.6.0 中如何实际工作?我的意思是,Foo不可序列化,它如何知道另一端stuff的值?

另一个显而易见的问题是 - 为什么它在 2.1.1 中停止工作?1.6.0 行为是否存在微妙的问题,我们不应该依赖它吗? 或者它只是 2.1.1 中的一个错误?

从某种意义上说,这可能不是一个直接的答案,可以为您提供Spark中序列化问题背后的确切原因以及为什么您的用例可能会以这种或那种方式工作,但是...让我对此作一些说明。

SparkContext是一切发生的地方(或者至少是它开始的地方)。在方法中,您可以找到干净的方法:

private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}

引用它的 scaladoc,你应该得到足够的关于 Spark 如何进行序列化验证的信息:

清理闭包以使其准备好序列化并发送到任务(删除$outer中未引用的变量,更新 REPL 变量)

如果设置了checkSerializableclean还将主动检查f是否可序列化,如果不是,则引发SparkException

搜索使用该方法的所有位置可能会揭示代码可能有效或无效的原因。这就像将clean方法应用于代码一样简单。

您也可以从RDD.map运算符开始,您可以在其中找到clean方法:

val cleanF = sc.clean(f)

有了这个,您可以了解为什么您的代码会根据您使用val还是lazy val给出不同的结果。

我认为最终您的代码可以重写如下:

// run spark-shell -c spark.driver.allowMultipleContexts=true
// use :paste -raw
package org.apache.spark
class Baz
trait Bar { lazy val baz = new Baz; def bar(i: Int) = baz }
case object Foo extends Bar {
val sc = new SparkContext("local[*]", "Clean", new SparkConf)
def foo = sc.clean(bar _)
}
// org.apache.spark.Foo.foo

我为此使用了spark-shell,似乎在今天构建的 Spark2.3.0-SNAPSHOT中,无论是否使用lazy关键字,代码都可以正常工作。

最新更新