我需要使用来自一个具有多个avro模式的主题的消息。
我使用c#libConfluent.SchemaRegistry
和Confluent.Kafka
来生成我的消费者。
我试图在不传递avro模式的情况下使用GenericRecord
类型来反序列化消息,但由于返回了一个无效json格式的字符串,所以序列化效果不佳。
public IConsumer<string, GenericRecord> Consumer =>
new ConsumerBuilder<string, GenericRecord>(_consumerConfig)
.SetValueDeserializer(new AvroDeserializer<GenericRecord>(
new CachedSchemaRegistryClient(_schemaRegistryConfig)).AsSyncOverAsync())
.Build();
var consumer = _kafkaClienteConsumerFactory.Consumer;
consumer.Subscribe(_configuration["Kafka:Topic"]);
result = consumer.Consume();
Mensagens.Add(result.Message.Value.ToString());
有些人使用双重序列化-反序列化方法(原始记录->通用记录->特定记录(,如本文和本期对话中所述。
此外,您可以使用我的多模式Avro反序列化程序GitHub存储库NuGet包。Confluent的.NET客户端存储库的第三方库中提到了这一点。
示例:
IConsumer<string, ISpecificRecord> consumer =
new ConsumerBuilder<string, ISpecificRecord>(_consumerConfig)
.SetValueDeserializer(new MultiSchemaAvroDeserializer(
new CachedSchemaRegistryClient(_schemaRegistryConfig)).AsSyncOverAsync())
.Build();
consumer.Subscribe(_configuration["Kafka:Topic"]);
var result = consumer.Consume();
List<ISpecificRecord> Mensagens = new List<ISpecificRecord>();
Mensagens.Add(result.Message.Value);
此外,您还可以查看kafka流库。它提供了一个类似的多类型AvroSerializer反序列化程序和分配器。