如何为genericdata.record编写kafkaavro serde



我使用的是kafka 0.10.2和avro用于序列化我的消息,包括键和值数据。现在我想使用Kafka流,但我坚持尝试为GenericData.Record类编写Serde类。

import org.apache.avro.generic.GenericData.Record;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
[...]
public final class KafkaAvroSerde implements Serde<Record> {
    private final Serde<Record> inner;
    public KafkaAvroSerde() {
        // Here I get the error
        inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
    }
    public KafkaAvroSerde(SchemaRegistryClient client) {
        this(client, Collections.emptyMap());
    }
    public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) {
        // Here I get the error
        inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props));
    }
    @Override
    public Serializer<Record> serializer() {
        return inner.serializer();
    }
    @Override
    public Deserializer<Record> deserializer() {
        return inner.deserializer();
    }
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        inner.serializer().configure(configs, isKey);
        inner.deserializer().configure(configs, isKey);
    }
    @Override
    public void close() {
        inner.serializer().close();
        inner.deserializer().close();
    }
}

这是我在评论行中遇到的错误

Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record>

我需要为GenericData.Record定义SERDE类(而不是我的特定pojo(,因为我可以在同一频道上具有不同的对象结构,因此Deserialializer应该返回我的GenericData(我将填充右边此步骤之后的Pojos(。

您将如何完成工作?谢谢

您已经在"汇合邮件"列表中提出了这个问题。这是我在那里发布的答案的摘要。

我们刚刚完成了Kafka流的官方汇合Avro Serde(特定的AVRO 通用Avro(的工作。请参阅https://github.com/confluentinc/schema-registry/tree/master/master/avro-serde。

新的AVRO SERDE(汇合模式注册表意识/兼容(将以即将到来的Confluent 3.3发布,这是几周的时间。

直到释放3.3,您可以从master分支中构建自己的工件(您必须首先与mvn install建立confluentinc/common和confluentinc/rest-utils的master分支,然后使用mvn install的Schema-Registry Project(,然后或例如将类复制到您自己的代码项目中。

注意:上方和下方项目中的master分支是开发分支,即预发行分支。这个答案的未来读者应牢记这一点。

我们还有有关如何使用新的,即将到来的汇合的Avro Serde的示例。您可以在https://github.com/confluentinc/examples的master分支中找到演示。

好吧,我想我做到了。我遵循了这个示例

https://github.com/johnreedlol/kafka-streams/blob/master/src/src/main/java/io/confluent/confluent/confluent/confluent/exampleass/toparpacticleslambdaexample.java

我使用了生成通用对象的genericavroserde类,然后我可以使用。

我希望这对其他人有用。

相关内容

  • 没有找到相关文章

最新更新