我已经编写了一个使用者来使用模式注册表读取 Avro 的通用记录。
FlinkKafkaConsumer010 kafkaConsumer010 = new FlinkKafkaConsumer010(KAFKA_TOPICS,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);
反序列化类看起来像这样:
public class KafkaGenericAvroDeserializationSchema implements KeyedDeserializationSchema<GenericRecord> {
private final String registryUrl;
private transient KafkaAvroDeserializer inner;
public KafkaGenericAvroDeserializationSchema(String registryUrl) {
this.registryUrl = registryUrl;
}
@Override
public GenericRecord deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
checkInitialized();
return (GenericRecord) inner.deserialize(topic, message);
}
@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}
}
它在我的机器上本地工作,但是当我将其部署在纱线集群上时,我出现以下异常:
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$LockableArrayList
Serialization trace:
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a member of class org.apache.avro.Schema$LockableArrayList with modifiers "public"
请帮助我解决此问题。
如邮件列表所示:
问题是你没有提供任何有意义的类型信息,所以 Flink 不得不求助于 Kryo。您需要在查询编译期间提取架构(在 main 中(并将其传递给反序列化架构。
public TypeInformation<T> getProducedType() {
return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.schema);
}
如果你不想静态提取它,你需要告诉 Flink 如何处理任意的 GenericRecords。你可以实现自己的序列化程序,它将 GenericRecords 写入 byte[],反之亦然。
请注意,我仍然建议只将模式与 Flink 应用程序捆绑在一起,而不是重新发明轮子。