作为参数传递的广播值中的NullPointer



:(

我想说,我是Spark的新手,因为很多帖子都是从。。但事实上,我并不是那么新鲜。尽管如此,我还是面临着广播变量的问题。

当一个变量被广播时,每个执行器都会收到它的副本。稍后,当这个变量在执行器中执行的代码部分(比如map或foreach(中被引用时,如果在驱动程序中设置的变量引用没有传递给它,执行器就不知道我们在说什么。我认为这可以很好地解释

我的问题是,我得到了一个nullPointerException,甚至很难。我将广播引用传递给了执行器。

class A {
var broadcastVal: Broadcast[Dataframe] = _
...
def method1 {
broadcastVal = otherMethodWhichSendBroadcast
doSomething(broadcastVal, others)
}
}
class B {
def doSomething(...) {
forEachPartition {x => doSomethingElse(x, broadcasVal)}
}
}
object C {
def doSomethingElse(...) {
broadcastVal.value.show --> Exception
}
}

我错过了什么?

提前感谢!

RDD和DataFrames已经是分布式结构,不需要将它们作为局部变量进行广播。(org.apache.spark.sql.functions.broadcast()函数(在执行联接时使用(不是本地变量广播(


即使您尝试代码语法,它也不会显示任何编译错误,而是会像NullPointerException一样抛出RuntimeException,这是100%有效的

解释行为的示例:

package examples
import org.apache.log4j.Level
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SparkSession}
object BroadCastCheck extends App {
org.apache.log4j.Logger.getLogger("org").setLevel(Level.OFF)
val spark = SparkSession.builder().appName(getClass.getName).master("local").getOrCreate()
val sc = spark.sparkContext
val df = spark.range(100).toDF()
var broadcastVal: Broadcast[DataFrame] = sc.broadcast(df)

val t1 = sc.parallelize(0 until 10)
val t2 = sc.broadcast(2) // this is right since its local variable can be primitive or map or any scala collection
val t3 = t1.filter(_ % t2.value == 0).persist() //this is the way of ha
t3.foreach {
x =>
println(x)
// broadcastVal.value.toDF().show // null pointer  wrong way
//   spark.range(100).toDF().show // null pointer  wrong way
}
}

结果:(如果您在上面的代码中取消注释broadcastVal.value.toDF().showspark.range(100).toDF().show(

Caused by: java.lang.NullPointerException
at org.apache.spark.sql.execution.SparkPlan.sparkContext(SparkPlan.scala:56)
at org.apache.spark.sql.execution.WholeStageCodegenExec.metrics$lzycompute(WholeStageCodegenExec.scala:528)
at org.apache.spark.sql.execution.WholeStageCodegenExec.metrics(WholeStageCodegenExec.scala:527)

在这里进一步阅读广播变量和广播函数之间的区别。。。