在dtaframe.map函数中更新累加器的值



我无法更新dataframe.map函数中的累加器值。PFB 代码相同。

case class TestPerson(name: String, age: Long, salary: Double)
val tom = TestPerson("Tom Hanks",37,35.5)
val sam = TestPerson("Sam Smith",40,40.5)
val stev = TestPerson("Stev Smith",45,30.5)
val PersonList = scala.collection.mutable.MutableList[TestPerson]()
PersonList += tom
PersonList += sam
PersonList += stev
val personDF = PersonList.toDF()
class ListAccumulatorParam[B] extends AccumulatorParam[List[Row]] {
  def zero(initialValue: List[Row]): List[Row] = {
    List.empty
  }
  def addInPlace(l1: List[Row],l2: List[Row]): List[Row] = {
    l1 ::: l2
  }  
}
var listAccum = sc.accumulator(List[Row]())(new ListAccumulatorParam[Row]())
personDF.map { row => listAccum += List(row)}

列表累积正在变为空白。

但与此同时,如果我进行并行化,然后检查值值是否在累加器中更新。 sc.parallelize(personDF.collect(((.foreach(row => listAccum += List(row((

实际用例是我想在同一行上执行更多操作..如果该操作失败,那么我希望返回该组行...这就是我想要累加器中那些行的原因。

我是否以错误的方式做了什么,因为该列表Accum变得空白?

我没有运行任何操作,因为它没有给出任何价值。

在 什么时候蓄能器真正可靠?

最新更新