我试图在每个分区重置longAccumulator
,但无法进行。我正在以这种方式尝试。
val list = (1 to 100).toList
val rdd = sc.parallelize(list,4)
val acc = sc.longAccumulator("1L")
rdd.mapPartitionsWithIndex{(i,iterator) =>
acc.reset()
acc.add(iterator.sum)
iterator
}
当前,此代码并未在每个分区处重置累加器。在驱动程序中,我们可以通过调用reset()
方法将累加器重置为零。我想问是否可以为每个分区重置累加器。
我有n个分区数。我想在List
中将值存储在每个分区中。对于分区0,其总和应存储在列表的索引0上等等。
在大多数情况下,哪个值(或其总和(无趣,并且容易更改。但是,仍然可以计算。
您不想使用累加器来汇总每个分区的值。相反,您可以模拟计算总和并将其返回为新的RDD。
要以分区顺序获取总和列表,请用总和返回索引,然后对其进行排序。然后删除它。
rdd.mapPartitionsWithIndex{(i,iterator) =>
Seq((i, iterator.reduce(_ + _))).toIterator
}.collect().sortBy(_._1).map(_._2)
这将为您提供一个和总和的数组。
根据您的末端游戏,您可能需要使用自定义地图累加器。看看这个。使用如下:
val myAcc = new ByKeyAdditiveAccumulator[Int, Long]
sparkContext.register(myAcc)
...
rdd.foreachPartition(partition =>
acc.add((TaskContext.get.partitionId(), partition.size))
...
import scala.collection.JavaConverters._
val partitionCount = myAcc.value.asScala