org.apache.spark.sparkexception:由于阶段失败而流产的作业:java.lang.nullp



我正面临一个问题,在hadoop/yarn cluster上运行火花乔恩,它在本地模式下运行良好,但由于此null指针异常,在集群模式下失败我使用的是Spark 1.6.2和Scala 2.10.6在本地和集群中,该应用程序是来自Kakfa的流应用程序流数据,这是我获取空指针的代码,我可以获取数据对于某些批次,但对于某些批次,我得到了空指针,因为NULL指针的堆积堆积是在此处失败的作业。devmain.scala

 Line 1 val lines: DStream[String,Array[Byte]] = myConsumer.createDefaultStream()
 Line 2 val keyDeLines = lines.map(lme.aParser);

这是createfaultstream()

def createDefaultStream(): DStream[(String,Array[Byte])] = {
    val consumerConfProps = List("zookeeper.connect","group.id","zookeeper.connection.timeout.ms")
    val kafkaConf = Utils.getSubProps(props,consumerConfProps)
    val topicArray = props.getProperty("topics").split(",")
    val topicMap = {
      topicArray.map((_, props.getProperty("numthreads").toInt)).toMap
    }
    KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
      kafkaConf,
      topicMap,
      StorageLevel.MEMORY_ONLY_SER
    )

这是lme.parser

def aParser(x: (String,Array[Byte])): Option[Map[String,Any]] = {
    logInfo("Entered lme: ")

    val decodeTry = Injection.invert(x._2)
    decodeTry match {
      case Failure(e) => {
        logInfo(s"Could not decode binary data: " + e.getStackTrace)
        None
      }
      case Success(eventPojo) => {
        val bs: String = eventPojo.toString
        logInfo("json: " + bs)
      }
}

代码切勿在null指针案例中输入'lme.aparser'函数,我将记录列入lme.parser的LINE1

这是stacktrace

java.lang.NullPointerException 
at DevMain$$anonfun$5.apply(DevMain.scala:2)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我是新手在集群上跑步的火花

注意:我知道在地图中,它试图迭代dstream线的元素..但是,当空的读数中,我在dsteam空批处理上完成的读数时,如果我在dsteam上完成的读数,请不应导致失败,请纠正我错误....在这方面挖掘了我的一部分,有些人指出,它未能从Java Iterator转换为Spark Code中的Scala Iterator,其他人指出,这可能是Spark的序列化代码中的一个错误。前往

的方式

我的意见,这是因为数据丢失。生产者发送消息和消费者接收消息,但是当网络是流量或内存中的数据而不是潮红时,数据可能是损失。您可以根据主题为基础设置此复制因素,另一个原因也导致数据丢失。

最新更新