收集器在集群上出现故障,在本地工作



在官方的spark文档中,有一个累加器的例子,它用于直接在RDD:上的foreach调用

scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10

我实现了自己的累加器:

val myCounter = sc.accumulator(0)
val myRDD = sc.textFile(inputpath) // :spark.RDD[String]
myRDD.flatMap(line => foo(line)) // line 69
def foo(line: String) = {
   myCounter += 1  // line 82 throwing NullPointerException
   // compute something on the input
}
println(myCounter.value)

在本地环境中,这很好用。然而,如果我在一个有几台机器的spark独立集群上运行此作业,工人会抛出一个

13/07/22 21:56:09 ERROR executor.Executor: Exception in task ID 247
java.lang.NullPointerException
    at MyClass$.foo(MyClass.scala:82)
    at MyClass$$anonfun$2.apply(MyClass.scala:67)
    at MyClass$$anonfun$2.apply(MyClass.scala:67)
    at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
    at spark.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:630)
    at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
    at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
    at spark.scheduler.ResultTask.run(ResultTask.scala:77)
    at spark.executor.Executor$TaskRunner.run(Executor.scala:98)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)

在使累加器CCD_ 2递增的线上。

我的问题是:累加器只能用于直接应用于RDD的"顶级"匿名函数,而不能用于嵌套函数吗?如果是,为什么我的调用在本地成功而在集群上失败?

edit:增加异常的详细程度。

在我的例子中,当我使用"extends App"创建火花应用程序时,accumulator在闭包中为null,如下所示

    object AccTest extends App {

    val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val accum = sc.accumulator(0, "My Accumulator")
    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
    println("count:" + accum.value)
    sc.stop
  }
}

我用main()方法替换了extends App,它在HDP 2.4 中的YARN集群中工作

object AccTest {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client")
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")
        val accum = sc.accumulator(0, "My Accumulator")
        sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
        println("count:" + accum.value)
        sc.stop
    }
}

工作

如果你这样定义函数:

def foo(line: String, myc: org.apache.spark.Accumulator[Int]) = {
    myc += 1
}

然后这样称呼它:

foo(line, myCounter)

如果使用"flatMap",则"myCounter"将不会更新,因为"flatMMap"是惰性函数。你可以使用这个代码:

myRDD.foreach(line => foo(line))
def foo(line: String) = {myCounter +=1}
println(myCounter.value)

相关内容

  • 没有找到相关文章

最新更新