Scala Spark NullPointerError在提交jar时出错,而不是在shell中



我的spark作业引发了一个无法追踪的空指针异常。当我打印潜在的null变量时,它们都会填充在每个工作者身上。我的数据不包含null值,因为同一个作业在spark shell中工作。下面是作业的执行函数,后面是错误消息。

所有未在函数中定义的辅助方法都是在spark作业对象的主体中定义的,所以我认为闭包不是问题所在。

override def execute(sc:SparkContext) = {
  def construct_query(targetTypes:List[String]) = Map("query" ->
    Map("nested" ->
      Map("path"->"annotations.entities.items",
        "query"-> Map("terms"->
          Map("annotations.entities.items.type"-> targetTypes)))))
  val sourceConfig = HashMap(
    "es.nodes" -> params.targetClientHost
  )
  // Base elastic search RDD returning articles which match the above query on entity types
  val rdd = EsSpark.esJsonRDD(sc,
    params.targetIndex,
    toJson(construct_query(params.entityTypes)),
    sourceConfig
  ).sample(false,params.sampleRate)
  // Mapping ES json into news article object, then extracting the entities list of
  // well defined annotations
  val objectsRDD = rdd.map(tuple => {
    val maybeArticle =
      try {
        Some(JavaJsonUtils.fromJson(tuple._2, classOf[SearchableNewsArticle]))
      }catch {
        case e: Exception => None
      }
    (tuple._1,maybeArticle)
  }
  ).filter(tuple => {tuple._2.isDefined && tuple._2.get.annotations.isDefined &&
    tuple._2.get.annotations.get.entities.isDefined}).map(tuple => (tuple._1, tuple._2.get.annotations.get.entities.get))
  // flat map the RDD of entities lists into a list of (entity text, (entity type, 1)) tuples
  (line 79) val entityDataMap: RDD[(String, (String, Int))] = objectsRDD.flatMap(tuple => tuple._2.items.collect({
    case item if (item.`type`.isDefined) && (item.text.isDefined) &&
   (line 81)(params.entityTypes.contains(item.`type`.get))  => (cleanUpText(item.text.get), (item.`type`.get, 1))
  }))
  // bucketize the tuples RDD into entity text, List(entity_type, entity_count) to make count aggregation and file writeouts
 // easier to follow
 val finalResults: Array[(String, (String, Int))] = entityDataMap.reduceByKey((x, y) => (x._1, x._2+y._2)).collect()
  val entityTypeMapping = Map(
    "HealthCondition" -> "HEALTH_CONDITION",
    "Drug" -> "DRUG",
    "FieldTerminology" -> "FIELD_TERMINOLOGY"
  )
  for (finalTuple <- finalResults) {
    val entityText = finalTuple._1
    val entityType = finalTuple._2._1
    if(entityTypeMapping.contains(entityType))
    {
                if(!Files.exists(Paths.get(entityTypeMapping.get(entityType).get+".txt"))){
        val myFile = new java.io.FileOutputStream(new   File(entityTypeMapping.get(entityType).get+".txt"),false)
        printToFile(myFile) {p => p.println(entityTypeMapping.get(entityType))}
      }
    }
    val myFile = new java.io.FileOutputStream(new   File(entityTypeMapping.get(entityType).get+".txt"),true)
    printToFile(myFile) {p => p.println(entityText)}
  }
}

下面的错误消息:

位于的java.lang.NullPointerExceptioncom.liquid.gazetters.GazetteerGenerator$$anonfun$4$$anonfon$apply$1.isDefinedAt(GazetterGenerator.scala:81)在com.liquid.gazetters.GazetteerGenerator$$anonfun$4$$anonfon$apply$1.isDefinedAt(GazetterGenerator.scala:79)在scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278)位于scala.collection.invariable.List.foreach(List.scala:318)scala.collection.TraversableLike$class.collect(TraversableLIK.scala:278)在scala.collection.AbstractTraversable.collection(Traversable.scala:105)在com.liquid.gazetteners.GazetteerGenerator$$anonfun$4.apply(GazetterGenerator.scala:79)在com.liquid.gazetteners.GazetteerGenerator$$anonfun$4.apply(GazetterGenerator.scala:79)位于scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)在org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)在org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMappingTask.scala:73)在org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMappingTask.scala:41)在org.apache.spark.scheduler.Task.run(Task.scala:89)org.apache.spark.executor.executor$TaskRunner.run(executor.scala:214)在java.util.concurrent.ThreadPoolExecutiator.runWorker(ThreadPoolExecutiator.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在java.lang.Thread.run(Thread.java:745)

这个问题已经解决。params属性未序列化,并且可用于激发工作程序。解决方案是在需要params属性的区域范围内形成一个spark广播变量。

相关内容

  • 没有找到相关文章

最新更新