我很困惑Spark是如何处理底层数据的。例如,当我运行一个流作业并应用foreachRDD
时,行为取决于变量是从外部作用域捕获还是在内部初始化。
val sparkConf = new SparkConf()
dStream.foreachRDD(rdd => {
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
...
})
在这种情况下,我得到一个异常:
. io .NotSerializableException: org.apache.spark.streaming.kafka的对象。DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData可能作为RDD操作关闭的一部分被序列化。这是因为DStream对象是从闭包内部引用的。请重写DStream中的RDD操作以避免这种情况。这样做是为了避免Spark任务中出现不必要的对象。
但是如果我移动sparkConf
里面,一切似乎都很好:
dStream.foreachRDD(rdd => {
val sparkConf = rdd.sparkContext.getConf
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
...
})
这对我来说看起来很奇怪,因为我认为foreachRDD
运行在驱动节点上,所以我没有预料到任何差异。
现在,如果我将SQL会话和配置移到foreachRDD
之外,它再次工作正常:
val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
val df = spark.read.json(rdd)
...
})
Spark文档中的一个片段建议使用以前的版本(其中配置和SQL上下文都是在foreachRDD
中创建的),这对我来说似乎效率较低:如果可以创建一次,为什么要为每批创建它们?
有人能解释为什么异常被抛出,什么是正确的方式来创建SQL上下文?
ForeachRDD运行,顾名思义,检查流中的每个rdd为什么要在每个rdd上重新创建spark上下文?正确的方法是最后一个:
val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
val df = spark.read.json(rdd)
...
})
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
不会创建另一个SparkSession
。只有一个存在。在worker
上,只需从job
获取。
在第一种方法中,您试图为每个分区实例化spark会话对象,这是不正确的。
根据其他人的回答,使用第三种方法。但是如果你需要使用第一种方法,那么你可以使用如下-
val sparkConf = new SparkConf()
dStream.foreachRDD(rdd => {
lazy val spark = SparkSession.builder.config(sparkConf).getOrCreate()
...
})
这里的Lazy求值有助于避免多次实例化spark会话,从而避免序列化问题。