如何使用C#和Confluent.Kafka使用来自具有多个avro模式的主题的Kafka消息



我需要使用来自一个具有多个avro模式的主题的消息。

我使用c#libConfluent.SchemaRegistryConfluent.Kafka来生成我的消费者。

我试图在不传递avro模式的情况下使用GenericRecord类型来反序列化消息,但由于返回了一个无效json格式的字符串,所以序列化效果不佳。

public IConsumer<string, GenericRecord> Consumer =>
new ConsumerBuilder<string, GenericRecord>(_consumerConfig)
.SetValueDeserializer(new AvroDeserializer<GenericRecord>(
new CachedSchemaRegistryClient(_schemaRegistryConfig)).AsSyncOverAsync())
.Build();    
var consumer = _kafkaClienteConsumerFactory.Consumer; 
consumer.Subscribe(_configuration["Kafka:Topic"]);
result = consumer.Consume();
Mensagens.Add(result.Message.Value.ToString());
带有Confluent.SchemaRegistry的Confluent.Kafka没有现成的功能。

有些人使用双重序列化-反序列化方法(原始记录->通用记录->特定记录(,如本文和本期对话中所述。

此外,您可以使用我的多模式Avro反序列化程序GitHub存储库NuGet包。Confluent的.NET客户端存储库的第三方库中提到了这一点。

示例:

IConsumer<string, ISpecificRecord> consumer =
new ConsumerBuilder<string, ISpecificRecord>(_consumerConfig)
.SetValueDeserializer(new MultiSchemaAvroDeserializer(
new CachedSchemaRegistryClient(_schemaRegistryConfig)).AsSyncOverAsync())
.Build();    
consumer.Subscribe(_configuration["Kafka:Topic"]);
var result = consumer.Consume();
List<ISpecificRecord> Mensagens = new List<ISpecificRecord>();
Mensagens.Add(result.Message.Value);

此外,您还可以查看kafka流库。它提供了一个类似的多类型AvroSerializer反序列化程序和分配器。

最新更新