如何在RDD映射操作中更新全局变量



我有RDD[(Int, Array[Double])],之后,我调用了一个classFunction

val rdd = spark.sparkContext.parallelize(Seq(
(1, Array(2.0,5.0,6.3)),
(5, Array(1.0,3.3,9.5)),
(1, Array(5.0,4.2,3.1)),
(2, Array(9.6,6.3,2.3)),
(1, Array(8.5,2.5,1.2)),
(5, Array(6.0,2.4,7.8)),
(2, Array(7.8,9.1,4.2))
)
)
val new_class = new ABC
new_class.demo(data)

在类内部,声明了一个全局变量值 =0。在 demo() 中声明了新变量 new_value = 0。映射操作后,new_value将更新,并在映射中打印更新的值。

class ABC extends Serializable {
var value  = 0
def demo(data_new : RDD[(Int ,Array[Double])]): Unit ={
var new_value = 0
data_new.coalesce(1).map(x => {
if(x._1 == 1)
new_value = new_value + 1
println(new_value)
value = new_value
}).count()
println("Outside-->" +value)
}
}

输出:-

1
1
2
2
3
3
3
Outside-->0

映射操作后如何更新全局变量值?

我不确定你在做什么,但你需要使用累加器来执行需要添加此类值的操作类型。

这是一个例子:

scala> val rdd = spark.sparkContext.parallelize(Seq(
|         (1, Array(2.0,5.0,6.3)),
|         (5, Array(1.0,3.3,9.5)),
|         (1, Array(5.0,4.2,3.1)),
|         (2, Array(9.6,6.3,2.3)),
|         (1, Array(8.5,2.5,1.2)),
|         (5, Array(6.0,2.4,7.8)),
|         (2, Array(7.8,9.1,4.2))
|       )
| )
rdd: org.apache.spark.rdd.RDD[(Int, Array[Double])] = ParallelCollectionRDD[83] at parallelize at <console>:24
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 46181, name: Some(My Accumulator), value: 0)
scala> rdd.foreach { x => if(x._1 == 1) accum.add(1) }
scala> accum.value
res38: Long = 3

正如@philantrovert所提到的,如果您希望计算每个键的出现次数,您可以执行以下操作:

scala> rdd.mapValues(_ => 1L).reduceByKey(_ + _).take(3)
res41: Array[(Int, Long)] = Array((1,3), (2,2), (5,2))                          

您也可以使用countByKey但要避免使用大型数据集。

不,您无法从映射内部更改全局变量。

如果您尝试计算函数中 1 的数量,则可以使用过滤器

val value = data_new.filter(x => (x._1 == 1)).count 
println("Outside-->" +value)

输出:

Outside-->3

此外,不建议在var使用可变变量。您应该始终尝试使用不可变作为val

我希望这有帮助!

OR You can do achieve your problem in this way also:
class ABC extends Serializable {
def demo(data_new : RDD[(Int ,Array[Double])]): Unit ={
var new_value = 0
data_new.coalesce(1).map(x => {
if(x._1 == 1)
var key = x._1
(key, 1)
}).reduceByKey(_ + _)
}
println("Outside-->" +demo(data_new))
}

相关内容

  • 没有找到相关文章

最新更新