尝试使用 Flink 读取 "avro" 序列化数据的 Kafka 流,如下所示:
tableEnv.connect(new Kafka()
.version("0.11")
.topic(source.getTopic())
.properties(source.getProperties())
.startFromLatest())
.withSchema(Schemafy.getSchemaFromJson(source.getAvroSchema()))
.withFormat(new Avro()
.avroSchema("{ "namespace": "io.avrotweets", "type": "record", "name": "value", "fields": [ { "type": "string", "name": "id" }, { "type": "string", "name": "screen_name" }, { "type": "string", "name": "text" } ]}")
)
.inAppendMode()
.registerTableSource(source.getName());
我得到以下异常:
java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:170)
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -53
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:122)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
我认为问题是消息密钥也被序列化,但使用自己的架构:
{
"namespace": "io.avrotweets",
"type": "record",
"name": "key",
"fields": [
{
"type": "string",
"name": "name"
}
]
}
但是我在哪里告诉连接器将该架构用于密钥。无论如何,我不知道这是否是问题所在,只是一个猜测。
架构不同。对于序列化,您使用不同数量的字段、不同的字段名称、不同的记录名称。Afaik 您需要为同一对象使用相同的 avro 模式。如果只想反序列化某些对象,请考虑可以使用"default"参数。