scala新手函数式编程。我有以下spark代码片段:
case class SPR(symbol:String, splitOrg:Double, splitAdj:Double, timeStamp: String, unx_tt: Int)
var oldFct = 15.0
val splitMap = collection.mutable.Map[String, Double]()
val tmp = splitsData.map{ row=>
var newFct = 1.0;
var sym = row(0).toString;
oldFct = splitMap.getOrElse(sym, 1.0)
newFct = row(12).toString.toDouble * oldFct
splitMap += (sym->newFct)
SPR(row(0).toString, row(12).toString.toDouble, newFct, row(10).toString, row(13).toString.toInt)
}.collect()
println("MAP ===========" + splitMap.size)
根据我的观察,我可以在block中使用原始数据类型,但在Map对象的情况下,我总是将size设置为0。因此,似乎没有关键,价值对增加。
提前感谢。
阅读Spark文档中的理解闭包。最相关的部分(将counter
替换为splitMap
):
RDD操作在其作用域之外修改变量可能是一个常见的混淆来源…
主要的挑战是上面代码的行为是未定义的。在使用单个JVM的本地模式下,上面的代码将对RDD中的值求和,并将其存储在计数器中。这是因为RDD和变量计数器都在驱动节点上的相同内存空间中。
然而,在集群模式下,发生的事情更加复杂,上面的操作可能无法按预期工作。为了执行作业,Spark将RDD操作的处理分解为任务——每个任务由执行器操作。在执行之前,Spark计算闭包。闭包是那些变量和方法,它们必须对执行器可见,以便在RDD上执行计算(在本例中为foreach())。这个闭包被序列化并发送给每个执行器。在本地模式下,只有一个执行器,所以所有东西都共享同一个闭包。然而,在其他模式下,情况并非如此,运行在不同工作节点上的执行器每个都有自己的闭包副本。
这里发生的事情是发送给每个执行器的闭包中的变量现在是副本,因此,当counter在foreach函数中被引用时,它不再是驱动节点上的计数器。在驱动节点的内存中仍然有一个计数器,但这对执行程序不再可见!执行程序只能看到来自序列化闭包的副本。因此,counter的最终值仍然为0,因为对counter的所有操作都引用了序列化闭包中的值。
为了在这些场景中确保良好定义的行为,应该使用Accumulator。Spark中的累加器专门用于提供一种机制,当执行在集群中的多个工作节点上分散时,可以安全地更新变量。本指南的累加器一节将更详细地讨论它们。
一般来说,闭包——像循环或局部定义的方法这样的构造——不应该用来改变一些全局状态。Spark不定义或保证从闭包外部引用的对象的突变行为。一些这样做的代码可能在本地模式下工作,但这只是偶然的,这样的代码在分布式模式下不会像预期的那样工作。如果需要全局聚合,请使用Accumulator。