我使用的是kafka 0.10.2和avro用于序列化我的消息,包括键和值数据。现在我想使用Kafka流,但我坚持尝试为GenericData.Record
类编写Serde
类。
import org.apache.avro.generic.GenericData.Record;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
[...]
public final class KafkaAvroSerde implements Serde<Record> {
private final Serde<Record> inner;
public KafkaAvroSerde() {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
}
public KafkaAvroSerde(SchemaRegistryClient client) {
this(client, Collections.emptyMap());
}
public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props));
}
@Override
public Serializer<Record> serializer() {
return inner.serializer();
}
@Override
public Deserializer<Record> deserializer() {
return inner.deserializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}
这是我在评论行中遇到的错误
Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record>
我需要为GenericData.Record
定义SERDE类(而不是我的特定pojo(,因为我可以在同一频道上具有不同的对象结构,因此Deserialializer应该返回我的GenericData
(我将填充右边此步骤之后的Pojos(。
您将如何完成工作?谢谢
您已经在"汇合邮件"列表中提出了这个问题。这是我在那里发布的答案的摘要。
我们刚刚完成了Kafka流的官方汇合Avro Serde(特定的AVRO 通用Avro(的工作。请参阅https://github.com/confluentinc/schema-registry/tree/master/master/avro-serde。
新的AVRO SERDE(汇合模式注册表意识/兼容(将以即将到来的Confluent 3.3发布,这是几周的时间。
直到释放3.3,您可以从master
分支中构建自己的工件(您必须首先与mvn install
建立confluentinc/common和confluentinc/rest-utils的master
分支,然后使用mvn install
的Schema-Registry Project(,然后或例如将类复制到您自己的代码项目中。
注意:上方和下方项目中的
master
分支是开发分支,即预发行分支。这个答案的未来读者应牢记这一点。
我们还有有关如何使用新的,即将到来的汇合的Avro Serde的示例。您可以在https://github.com/confluentinc/examples的master
分支中找到演示。
好吧,我想我做到了。我遵循了这个示例
https://github.com/johnreedlol/kafka-streams/blob/master/src/src/main/java/io/confluent/confluent/confluent/confluent/exampleass/toparpacticleslambdaexample.java
我使用了生成通用对象的genericavroserde类,然后我可以使用。
我希望这对其他人有用。