我正面临一个问题,在hadoop/yarn cluster上运行火花乔恩,它在本地模式下运行良好,但由于此null指针异常,在集群模式下失败我使用的是Spark 1.6.2和Scala 2.10.6在本地和集群中,该应用程序是来自Kakfa的流应用程序流数据,这是我获取空指针的代码,我可以获取数据对于某些批次,但对于某些批次,我得到了空指针,因为NULL指针的堆积堆积是在此处失败的作业。devmain.scala
Line 1 val lines: DStream[String,Array[Byte]] = myConsumer.createDefaultStream()
Line 2 val keyDeLines = lines.map(lme.aParser);
这是createfaultstream()
def createDefaultStream(): DStream[(String,Array[Byte])] = {
val consumerConfProps = List("zookeeper.connect","group.id","zookeeper.connection.timeout.ms")
val kafkaConf = Utils.getSubProps(props,consumerConfProps)
val topicArray = props.getProperty("topics").split(",")
val topicMap = {
topicArray.map((_, props.getProperty("numthreads").toInt)).toMap
}
KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
kafkaConf,
topicMap,
StorageLevel.MEMORY_ONLY_SER
)
这是lme.parser
def aParser(x: (String,Array[Byte])): Option[Map[String,Any]] = {
logInfo("Entered lme: ")
val decodeTry = Injection.invert(x._2)
decodeTry match {
case Failure(e) => {
logInfo(s"Could not decode binary data: " + e.getStackTrace)
None
}
case Success(eventPojo) => {
val bs: String = eventPojo.toString
logInfo("json: " + bs)
}
}
代码切勿在null指针案例中输入'lme.aparser'函数,我将记录列入lme.parser的LINE1
这是stacktrace
java.lang.NullPointerException
at DevMain$$anonfun$5.apply(DevMain.scala:2)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我是新手在集群上跑步的火花
注意:我知道在地图中,它试图迭代dstream线的元素..但是,当空的读数中,我在dsteam空批处理上完成的读数时,如果我在dsteam上完成的读数,请不应导致失败,请纠正我错误....在这方面挖掘了我的一部分,有些人指出,它未能从Java Iterator转换为Spark Code中的Scala Iterator,其他人指出,这可能是Spark的序列化代码中的一个错误。前往
我的意见,这是因为数据丢失。生产者发送消息和消费者接收消息,但是当网络是流量或内存中的数据而不是潮红时,数据可能是损失。您可以根据主题为基础设置此复制因素,另一个原因也导致数据丢失。