Flink throws java.io.NotSerializableException



我做了自定义KeyedDeserializationSchema来反序列化 kafka 消息并像这样使用它:

object Job {
case class KafkaMsg[K, V](
key: K, value: V, topic: String, partiton: Int, offset: Long)
trait Deser[A] {
def deser(a: Array[Byte]): A
}
object Deser {
def apply[A](implicit sh: Deser[A]): Deser[A] = sh
def deser[A: Deser](a: Array[Byte]) = Deser[A].deser(a)
implicit val stringDeser: Deser[String] =
new Deser[String] {
def deser(a: Array[Byte]): String = ""
}
implicit val longDeser: Deser[Long] =
new Deser[Long] {
def deser(a: Array[Byte]): Long = 0
}
}
class TypedKeyedDeserializationSchema[
K: Deser: TypeInformation,
V: Deser: TypeInformation
] extends KeyedDeserializationSchema[KafkaMsg[K, V]] {
def deserialize(key:   Array[Byte],
value: Array[Byte],
topic: String,
partition: Int,
offset:    Long
): KafkaMsg[K, V] =
KafkaMsg(Deser[K].deser(key),
Deser[V].deser(value),
topic,
partition,
offset
)
def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false
def getProducedType(): TypeInformation[KafkaMsg[K, V]] =
createTypeInformation
}
def main(args: Array[String]) {
val properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink-test")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new FlinkKafkaConsumer011(
"topic",
new TypedKeyedDeserializationSchema[String, Long],
properties
))
.print
env.execute("Flink Scala API Skeleton")
}
}

这给了我:

[error] Caused by: java.io.NotSerializableException: l7.Job$Deser$$anon$7
[error]         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[error]         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error]         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error]         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error]         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error]         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error]         at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error]         at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error]         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error]         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[error]         at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[error]         at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
[error]         at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
[error]         at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670)
[error]         at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600)
[error]         at l7.Job$.main(Job.scala:89)
[error]         at l7.Job.main(Job.scala)

问题显然出在我的Deser类型类中,例如实现,但我不明白究竟是什么导致了此错误或如何解决它。

是的,此错误的原因是您的DeserTypeInformation不同,不会扩展/实现Serializable。要找出为什么会发生这种情况,您可以先问自己一个问题:为什么我需要声明implicit val stringDeserimplicit val longDeser

答案是 Scala 编译器在看到K: Deser: TypeInformation形式的通用约束时会做什么。它的作用是使用implicit证据对象重写它。所以你的代码被转换成这样的东西:

class TypedKeyedDeserializationSchema[K, V](implicit val kDeserEv: Deser[K],
val kTypeInfoEn: TypeInformation[K],
val vDeserEv: Deser[V],
val vTypeInfoEn: TypeInformation[V]) extends KeyedDeserializationSchema[KafkaMsg[K, V]] {
def deserialize(key: Array[Byte],
value: Array[Byte],
topic: String,
partition: Int,
offset: Long
): KafkaMsg[K, V] =
KafkaMsg(kDeserEv.deser(key),
vDeserEv.deser(value),
topic,
partition,
offset
)
def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false
def getProducedType(): TypeInformation[KafkaMsg[K, V]] = createTypeInformation
}

现在很明显,类型TypedKeyedDeserializationSchema[String,Long]的对象实际上包含两个类型为Deser[String]Deser[Long]的字段,其中包含您上面声明的implicit val的值。因此,当 Flink 尝试确保您传递给它的函数Serializable时,检查失败。

现在的解决方案很明显:让你的特质Deser[A]扩展Serializable

trait Deser[A] extends Serializable {
def deser(a: Array[Byte]): A
}

相关内容

  • 没有找到相关文章

最新更新