我正在尝试在我的flink流作业中反序列化kafka事件。这是我的代码:
...
case class URLResponse (status: Int, domain: String, url: String, queue: String, html: String)
...
val schema: Schema = AvroSchema[URLResponse]
...
val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties))
作业在运行时抛出这个异常:
...
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 26 more
Process finished with exit code 1
我读到我不应该使用Kryo
,但我不知道怎么做。我试着:
executionConfig.enableForceAvro()
executionConfig.disableForceKryo()
但它没有帮助。
如果您不能使用使用Java环境来添加源(也许您正在使用StreamExecutionEnvironment)。readFile方法)这里有另一个解决方案:https://stackoverflow.com/a/32453031/899937,本质上是:
val unmodifiableCollectionClass = Class.forName("java.util.Collections$UnmodifiableCollection")
env.getConfig.addDefaultKryoSerializer(unmodifiableCollectionClass, classOf[UnmodifiableCollectionsSerializer])
kryo-serializer不再包含在Flink中,所以你必须将它作为依赖项添加。
我在Kinesis数据流上使用Avro GenericRecord时遇到了同样的问题。使用scala 2.12和flink 1.11.4.
我的解决方案是添加一个隐式的TypeInformationimplicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
下面是关注序列化问题的完整代码示例:
@Test def `test avro generic record serializer`(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val schema: String =
"""
|{
| "namespace": "com.mberchon.monitor.dto.avro",
| "type": "record",
| "name": "TestAvro",
| "fields": [
| {"name": "strVal", "type": ["null", "string"]},
| {"name": "longVal", "type": ["null", "long"]}
| ]
|}
""".stripMargin
val avroSchema = new Schema.Parser().parse(schema)
val rec:GenericRecord = new GenericRecordBuilder(avroSchema)
.set("strVal","foo")
.set("longVal",1234L)
.build()
implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
val _ = env.fromElements(rec,rec).addSink(new PrintSinkFunction[GenericRecord]())
env.execute("Test serializer")
}
回到你的上下文,下面的代码应该可以工作:
...
case class URLResponse (status: Int, domain: String, url: String, queue: String, html: String)
...
val schema: Schema = AvroSchema[URLResponse]
...
implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(schema)
val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties))
我在java中也遇到过同样的问题,下面的代码片段帮助我解决问题
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
environment.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
你还需要添加maven依赖来解析unmodimodiablecollectionsserializer
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.45</version>
</dependency>
提到的异常与scala实现avro反序列化的问题有关。如果我使用java实现(https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro),它可以正常工作。我的解决方案:
val javaStream = env.getJavaEnv.addSource(new FlinkKafkaConsumer[GenericRecord](
kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties),
new GenericRecordAvroTypeInfo(schema))
val stream = new DataStream[GenericRecord](javaStream)