Scala Spark中的NullPointerException,似乎是由集合类型引起的



sessionIdList的类型:

scala> sessionIdList
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30

当我试图运行下面的代码:

val x = sc.parallelize(List(1,2,3)) 
val cartesianComp = x.cartesian(x).map(x => (x))
val kDistanceNeighbourhood = sessionIdList.map(s => {
    cartesianComp.filter(v => v != null)
})
kDistanceNeighbourhood.take(1)

I receive exception:

14/05/21 16:20:46 ERROR Executor: Exception in task ID 80
java.lang.NullPointerException
        at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:38)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)

但是如果我使用:

val l = sc.parallelize(List("1","2")) 
val kDistanceNeighbourhood = l.map(s => {    
    cartesianComp.filter(v => v != null)
})
kDistanceNeighbourhood.take(1)

则不显示异常

两个代码片段的区别在于,在第一个代码片段中,sessionIdList的类型是:

res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30

,在第二个代码片段中"l"的类型是

scala> l
res13: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at <console>:12

为什么会出现这个错误?

我是否需要将sessionIdList转换为ParallelCollectionRDD以修复此问题?

Spark不支持RDD嵌套(参见https://stackoverflow.com/a/14130534/590203了解同样问题的另一个发生情况),因此您不能在其他RDD操作中对RDD执行转换或操作。

在第一种情况下,当worker试图访问一个SparkContext对象时,你会看到一个NullPointerException,该对象只存在于驱动程序中,而不存在于worker中。

在第二种情况下,我的直觉是作业是在驱动程序上本地运行的,完全是偶然的。

这是一个合理的问题,我已经听人问过很多次了。我将试着解释为什么这是正确的,因为它可能会有所帮助。

在生产环境中,嵌套的rdd将总是抛出异常。嵌套函数调用,正如我认为你在这里描述的那样,如果它意味着在RDD操作中调用RDD操作,也会导致失败,因为它实际上是一样的。(RDD是不可变的,因此执行RDD操作(如"map")相当于创建一个新的RDD。)创建嵌套RDD的能力是RDD定义方式和Spark应用程序设置方式的必然结果。

RDD是驻留在Spark executor上的对象(称为分区)的分布式集合。Spark执行器之间不能通信,只能与Spark驱动通信。RDD操作都是在这些分区上分段计算的。因为RDD的执行器环境不是递归的(也就是说,你可以将Spark驱动配置到带有子执行器的Spark执行器上),RDD也不能。

在你的程序中,你创建了一个整数分区的分布式集合。然后执行映射操作。当Spark驱动程序看到映射操作时,它将执行映射的指令发送给执行程序,执行程序并行地在每个分区上执行转换。但是您的映射无法完成,因为在每个分区上您都试图调用"整个RDD"来执行另一个分布式操作。这是不可能的,因为每个分区不能访问其他分区上的信息,如果可以,计算就不能并行运行。

您可以做的是,因为您在映射中需要的数据可能很小(因为您正在做一个过滤器,并且过滤器不需要关于sessionIdList的任何信息),首先过滤会话ID列表。然后把清单交给司机。然后将其广播给执行者,您可以在地图中使用它。如果sessionID列表太大,您可能需要执行连接。

相关内容

最新更新