我将在Spark的上下文中提出这个问题,因为这就是我面临的问题,但这可能是一个简单的Java问题。
在我们的火花作业中,我们有一个Resolver
需要在我们所有的工作人员中使用(它在 udf 中使用)。问题是它不可序列化,我们不能将其更改为可序列化。解决方案是将其作为另一个可序列化类的成员。
所以我们最终得到:
public class Analyzer implements Serializable {
transient Resolver resolver;
public Analyzer() {
System.out.println("Initializing a Resolver...");
resolver = new Resolver();
}
public int resolve(String key) {
return resolver.find(key);
}
}
然后我们使用 Spark APIbroadcast
这个类:
val analyzer = sparkContext.broadcast(new Analyzer())
(有关Spark广播的更多信息可以在这里找到)
然后,我们继续在 UDF 中使用analyzer
,作为 Spark 代码的一部分,如下所示:
val resolve = udf((key: String) => analyzer.value.resolve(key))
val result = myDataFrame.select("key", resolve("key")).count()
这一切都按预期工作,但让我们想知道。
Resolver
不实现Serializable
,因此被标记为transient
- 这意味着它不会与它的所有者对象一起序列化Analyzer
。
但是从上面的代码中可以清楚地看到,resolve()
方法使用resolver
,所以它不能为 null。事实并非如此。代码有效。
那么,如果字段未通过序列化传递,则如何实例化resolver
成员?
我最初的想法是,也许Analyzer
构造函数是在接收端调用的(即火花工作者),但随后我希望看到该行"Initializing a Resolver..."
打印几次。但它只打印一次,这可能表明它只被调用一次,就在它传递给广播 API 之前。那么为什么resolver
不为空呢?
我是否缺少有关 JVM 序列化或 Spark 序列化的内容?
这段代码是如何工作的?
Spark 以cluster
模式在 YARN 上运行。spark.serializer
设置为org.apache.spark.serializer.KryoSerializer
。
因此,如果字段未通过序列化传递,则 解析程序成员已实例化?
当调用kryo.readObject
时,它通过构造函数调用(new Resolver
)实例化:
kryo.readClassAndObject(input).asInstanceOf[T]
我最初的想法是,也许分析器构造函数被称为 在接收方(即火花工人),但随后我会期望 以查看"正在初始化解析程序..."打印了几次。 但它只打印一次,这可能是对 它只被调用一次的事实
这不是广播变量的工作方式。发生的情况是,当每个执行器需要在作用域中使用广播变量时,它首先检查其BlockManager
内存中是否有该对象,如果没有,它会向驱动程序或邻居执行器(如果同一工作器节点上有多个执行器)询问其缓存实例,然后对其进行序列化并将其发送给他, 反过来,他接收实例并将其缓存在自己的BlockManager
中。
这记录在TorrentBroadcast
的行为中(这是默认的广播实现):
* The driver divides the serialized object into small chunks and
* stores those chunks in the BlockManager of the driver.
*
* On each executor, the executor first attempts to fetch the object from its BlockManager. If
* it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
* other executors if available. Once it gets the chunks, it puts the chunks in its own
* BlockManager, ready for other executors to fetch from.
*
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor).
如果我们删除瞬态,它会失败,并且堆栈跟踪导致 Kryo
这是因为您的Resolver
类中可能有一个字段,即使 Kryo 也无法序列化,无论Serializable
属性如何。