java.io.NotSerializableException with Spark Streaming Checkp



我已经在我的 Spark 流应用程序中启用了检查点,并在作为依赖项下载的类上遇到此错误。

没有检查点,应用程序运行良好。

错误:

com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer
Serialization stack:
- object not serializable (class: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer, value: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer@46c7c593)
- field (class: com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, name: _paranamer, type: interface com.fasterxml.jackson.module.paranamer.shaded.Paranamer)
- object (class com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector@39d62e47)
- field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _secondary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
- object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@7a925ac4)
- field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _primary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
- object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@203b98cf)
- field (class: com.fasterxml.jackson.databind.cfg.BaseSettings, name: _annotationIntrospector, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
- object (class com.fasterxml.jackson.databind.cfg.BaseSettings, com.fasterxml.jackson.databind.cfg.BaseSettings@78c34153)
- field (class: com.fasterxml.jackson.databind.cfg.MapperConfig, name: _base, type: class com.fasterxml.jackson.databind.cfg.BaseSettings)
- object (class com.fasterxml.jackson.databind.DeserializationConfig, com.fasterxml.jackson.databind.DeserializationConfig@2df0a4c3)
- field (class: com.fasterxml.jackson.databind.ObjectMapper, name: _deserializationConfig, type: class com.fasterxml.jackson.databind.DeserializationConfig)
- object (class com.fasterxml.jackson.databind.ObjectMapper, com.fasterxml.jackson.databind.ObjectMapper@2db07651)

我不确定如何将此类扩展为可序列化的,因为它是一个 maven 依赖项。我在我的pom中使用杰克逊核心的v2.6.0.xml。如果我尝试使用较新版本的杰克逊核心,我会收到不兼容的杰克逊版本异常。

法典

liveRecordStream
.foreachRDD(newRDD => {
if (!newRDD.isEmpty()) {
val cacheRDD = newRDD.cache()
val updTempTables = tempTableView(t2s, stgDFMap, cacheRDD)
val rdd = updatestgDFMap(stgDFMap, cacheRDD)
persistStgTable(stgDFMap)
dfMap
.filter(entry => updTempTables.contains(entry._2))
.map(spark.sql)
.foreach( df => writeToES(writer, df))
cacheRDD.unpersist()
}
}

仅当方法调用发生在foreachRDD内部时,才会发生此问题,例如本例中的tempTableView

临时表视图

def tempTableView(t2s: Map[String, StructType], stgDFMap: Map[String, DataFrame], cacheRDD: RDD[cacheRDD]): Set[String] = {
stgDFMap.keys.filter { table =>
val tRDD = cacheRDD
.filter(r => r.Name == table)
.map(r => r.values)
val tDF = spark.createDataFrame(tRDD, tableNameToSchema(table))
if (!tRDD.isEmpty()) {
val tName = s"temp_$table"
tDF.createOrReplaceTempView(tName)
}
!tRDD.isEmpty()
}.toSet
}

任何帮助,不胜感激。不确定如何调试并解决问题。

从您共享的代码片段中,我没有看到在哪里调用jackson库。但是,当您尝试发送未通过线路实现Serializable接口的对象时,通常会发生NotSerializableException

Spark是分布式处理引擎,这意味着它的工作方式是这样的:跨节点有一个驱动程序和多个执行器。只有需要计算的代码部分由driver发送到executors(通过线路)。Spark 转换以这种方式发生,即跨多个节点,如果您尝试将未实现serializable接口的类实例传递给此类代码块(跨节点执行的块),它将抛出NotSerializableException

前任:

def main(args: Array[String]): Unit = {
val gson: Gson = new Gson()
val sparkConf = new SparkConf().setMaster("local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq("0","1"))
val something = rdd.map(str => {
gson.toJson(str)
})
something.foreach(println)
spark.close()
}

此代码块将抛出NotSerializableException,因为我们正在将Gson实例发送到分布式函数。map是一个 Spark 转换操作,因此它将在执行器上执行。以下方法将起作用:

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq("0","1"))
val something = rdd.map(str => {
val gson: Gson = new Gson()
gson.toJson(str)
})
something.foreach(println)
spark.close()
}

上述方法起作用的原因是,我们在转换中实例化Gson,因此它将在执行器处实例化,这意味着它不会通过线路从驱动程序发送,因此不需要序列化。

问题出在尝试序列化的jacksonobjectMapper上。objectMapper不应序列化。通过添加@transient val objMapper = new ObjectMapper...修复此问题

最新更新