org.apache.spark.SparkException: UDF中任务不可序列化错误



我们正在尝试从kafaka获取数据,并对avro数据格式进行反序列化。代码工作良好,直到kafkaDataframe,数据从kafka主题提取,但当试图从kafkaDataframe使用反序列化()UDF方法提取值。它抛出异常为Task not serialiable和java.io.NotSerializableException: io.confluent.kafka. schemareregistry .client. cachedschemaregistryclient。请求任何人帮助我们解决这个问题。

代码复制自[medium link] https://github.com/xebia-france/spark-structured-streaming-blog/blob/master/src/main/scala/AvroConsumer.scala

import com.databricks.spark.avro.SchemaConverters
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession
import org.apache.kafka.common.serialization.ByteArrayDeserializer
val topic = "topic"
val kafkaUrl = "kafkaUrl"
val schemaRegistryUrl = "schemaRegistryUrl"
val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)

class AvroDeserializer extends AbstractKafkaAvroDeserializer with Serializable {
def this(client: SchemaRegistryClient) {
this()
this.schemaRegistry = client
}
override def deserialize(bytes: Array[Byte]): String = {
val genericRecord = this.deserialize(bytes).asInstanceOf[GenericRecord]
genericRecord.toString
}
}
val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
val sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
object DeserializerWrapper extends Serializable{
val deserializer = kafkaAvroDeserializer
}
spark.udf.register("deserialize", (bytes: Array[Byte]) => DeserializerWrapper.deserializer.deserialize(bytes))
val kafkaDataFrame = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaUrl).option("subscribe", topic).option("startingOffsets", "earliest").load()
kafkaDataFrame.show() // This code work fine in console
val valueDataFrame = kafkaDataFrame.selectExpr("deserialize(value) AS message")
valueDataFrame.show() // This code is not working and throwing exception as "org.apache.spark.SparkException: Task not serializable" and "java.io.NotSerializableException: AvroDeserializer"

下面是完整的异常跟踪,供参考。

scala> valueDataFrame.show()
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:613)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
... 49 elided
Caused by: java.io.NotSerializableException: io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
Serialization stack:
- object not serializable (class: io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient, value: io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient@40199d5e)
- field (class: $iw, name: schemaRegistryClient, type: class io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient)
- object (class $iw, $iw@2d579733)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@539c833d)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7b217a33)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6b21a869)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f849a79)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@73372652)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ecd395f)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@15dbd88e)
- field (class: $line25.$read, name: $iw, type: class $iw)
- object (class $line25.$read, $line25.$read@234a21e9)
- field (class: $iw, name: $line25$read, type: class $line25.$read)
- object (class $iw, $iw@31d635ba)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1172a648)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@69f7da24)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@53e0c50)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c5761f)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@67306a84)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@615a30bd)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@6cc45cf2)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@1b4bfdb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@29a1aca8)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@70ebf6d8)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@9febb7c)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2f2f6aaa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5b7bccc3)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@68fa9450)
- field (class: $line41.$read, name: $iw, type: class $iw)
- object (class $line41.$read, $line41.$read@170f2883)
- field (class: $iw, name: $line41$read, type: class $line41.$read)
- object (class $iw, $iw@3fa0f38a)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@48754a85)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
- element of array (index: 3)
- array (class [Ljava.lang.Object;, size 4)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 80 more

这个错误意味着你不能在所有Spark执行器上使用CachedSchemaRegistryClient的实例,因为它不能被序列化。

由于你不能使它序列化(既因为你不拥有它,也因为它可能是一个持有一些IO/网络资源的类而没有意义),你必须以某种方式在每个执行器上创建一个实例。

如何在你的情况下实现这一点?我不确定是否诚实,因为您在UDF中使用客户机,而我不知道它的生命周期。您应该寻找使UDF使用不可序列化类的方法。

最新更新