是否可以在火花中的每个分区重置长蓄能器



我试图在每个分区重置longAccumulator,但无法进行。我正在以这种方式尝试。

val list = (1 to 100).toList
val rdd = sc.parallelize(list,4)
val acc = sc.longAccumulator("1L")
rdd.mapPartitionsWithIndex{(i,iterator) => 
acc.reset()
acc.add(iterator.sum)
iterator
}

当前,此代码并未在每个分区处重置累加器。在驱动程序中,我们可以通过调用reset()方法将累加器重置为零。我想问是否可以为每个分区重置累加器。

我有n个分区数。我想在List中将值存储在每个分区中。对于分区0,其总和应存储在列表的索引0上等等。

在大多数情况下,哪个值(或其总和(无趣,并且容易更改。但是,仍然可以计算。


您不想使用累加器来汇总每个分区的值。相反,您可以模拟计算总和并将其返回为新的RDD。

要以分区顺序获取总和列表,请用总和返回索引,然后对其进行排序。然后删除它。

rdd.mapPartitionsWithIndex{(i,iterator) => 
  Seq((i, iterator.reduce(_ + _))).toIterator
}.collect().sortBy(_._1).map(_._2)

这将为您提供一个和总和的数组。

根据您的末端游戏,您可能需要使用自定义地图累加器。看看这个。使用如下:

val myAcc = new ByKeyAdditiveAccumulator[Int, Long]
sparkContext.register(myAcc)
...
rdd.foreachPartition(partition => 
   acc.add((TaskContext.get.partitionId(), partition.size))
...
import scala.collection.JavaConverters._
val partitionCount = myAcc.value.asScala

相关内容

  • 没有找到相关文章

最新更新