Spark如何传递Scala对象到映射的转换



我不知道Spark是如何将对象传递给执行器的,所以我写这篇文章是为了测试。(在1个执行器中运行(

在executor中,z的init值是100,尽管我在驱动程序中将其设置为0。并且在四个map函数期间,其值保持增长而不重置为100。将z更改为1000的修改也被执行器忽略。

为什么会发生这种情况?Spark如何将对象传递给map变压器?

object main extends App {
val a = sc.parallelize((0 until 10).toList)
A.z = 0
println(a.map(x=>A.kk()).collect().mkString(","))
println(a.map(x=>A.kk()).collect().mkString(","))
println(s"driver z: ${A.z}")
A.z=1000
println("change z to 1000")
println(a.map(x=>A.kk()).collect().mkString(","))
println(a.map(x=>A.kk()).collect().mkString(","))
}
object A{
var z =100
def kk(): Int ={
z+=1
z
}
}

输出为

104,105,101,102,103,106,107,108,109,110
114,115,111,112,113,116,117,118,119,120
driver z: 0
change z to 1000
121,122,123,124,125,126,127,128,129,130
131,133,132,134,135,136,137,138,139,140

如果您想操作从驱动程序传递给执行程序的对象(无论您有1个执行程序还是N个(,您都必须使用累加器。

Spark不传递在Driver中操作的对象,而是序列化原始对象(无论更改如何,它都是您的"z=100"(。此外,对执行程序中的对象所做的所有更改在驱动程序中都不可见。

尝试:

// Driver
val acc = sparkSession.sparkContext.longAccumulator("foo")
// Executor/s
acc.add(...)
acc.reset()

请注意,每个执行人都有自己的累加器副本。对数据集执行操作后,将调用"merge"函数。

https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#accumulators

最新更新