Spark Streaming中的序列化问题



我很困惑Spark是如何处理底层数据的。例如,当我运行一个流作业并应用foreachRDD时,行为取决于变量是从外部作用域捕获还是在内部初始化。

val sparkConf = new SparkConf()
dStream.foreachRDD(rdd => {
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})

在这种情况下,我得到一个异常:

. io .NotSerializableException: org.apache.spark.streaming.kafka的对象。DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData可能作为RDD操作关闭的一部分被序列化。这是因为DStream对象是从闭包内部引用的。请重写DStream中的RDD操作以避免这种情况。这样做是为了避免Spark任务中出现不必要的对象。

但是如果我移动sparkConf里面,一切似乎都很好:

dStream.foreachRDD(rdd => {
    val sparkConf = rdd.sparkContext.getConf
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})

这对我来说看起来很奇怪,因为我认为foreachRDD运行在驱动节点上,所以我没有预料到任何差异。

现在,如果我将SQL会话和配置移到foreachRDD之外,它再次工作正常:

val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
    val df = spark.read.json(rdd)
    ...
})

Spark文档中的一个片段建议使用以前的版本(其中配置和SQL上下文都是在foreachRDD中创建的),这对我来说似乎效率较低:如果可以创建一次,为什么要为每批创建它们?

有人能解释为什么异常被抛出,什么是正确的方式来创建SQL上下文?

ForeachRDD运行,顾名思义,检查流中的每个rdd为什么要在每个rdd上重新创建spark上下文?正确的方法是最后一个:

val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
    val df = spark.read.json(rdd)
    ...
})

val spark = SparkSession.builder.config(sparkConf).getOrCreate()不会创建另一个SparkSession。只有一个存在。在worker上,只需从job获取。

在第一种方法中,您试图为每个分区实例化spark会话对象,这是不正确的。

根据其他人的回答,使用第三种方法。但是如果你需要使用第一种方法,那么你可以使用如下-

val sparkConf = new SparkConf()
dStream.foreachRDD(rdd => {
    lazy val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})

这里的Lazy求值有助于避免多次实例化spark会话,从而避免序列化问题。

相关内容

  • 没有找到相关文章

最新更新