c# ConfluentKafka SetValueDeserializer对象反序列化



在我的消费者中,我想反序列化Kafka protobuf消息。键是字符串类型,但消息值是一个protobuf对象。我知道我必须为消息值创建自己的自定义反序列化器,但不知道如何创建一个。下面是我的消费者实现,我需要替换标记行:

using Confluent.Kafka;
using System;
using System.Threading;
namespace EventHubsForKafkaSample
{
class Worker1
{
public static void Consumer(string brokerList, string connStr, string consumergroup, string topic, string cacertlocation)
{
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SocketTimeoutMs = 60000,                //this corresponds to the Consumer config `request.timeout.ms`
SessionTimeoutMs = 30000,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connStr,
SslCaLocation = cacertlocation,
GroupId = consumergroup,
AutoOffsetReset = AutoOffsetReset.Earliest,
BrokerVersionFallback = "1.0.0",        //Event Hubs for Kafka Ecosystems supports Kafka v1.0+, a fallback to an older API will fail
//Debug = "security,broker,protocol"    //Uncomment for librdkafka debugging information
};
using (var consumer = new ConsumerBuilder<string, ProtobufMessage>(config)
.SetKeyDeserializer(Deserializers.Utf8)
.SetValueDeserializer(Deserializers.Utf8) //<<-----
.Build())
{
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };
consumer.Subscribe(topic);
Console.WriteLine("Consuming messages from topic: " + topic + ", broker(s): " + brokerList);
while (true)
{
try
{
var msg = consumer.Consume(cts.Token);
Console.WriteLine($"Received: '{msg.Value}'");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
catch (Exception e)
{
Console.WriteLine($"Error: {e.Message}");
}
}
}
}
}
public class ProtobufMessage
{
public DateTime timestamp { get; set; }
public int inputId { get; set; }
public double? value { get; set; }
public int sourceId { get; set; }
public string inputGuid { get; set; }
}
}
Protobuf消息格式:
syntax = "proto3";
package ileco.chimp.proto;
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
option java_package = "ileco.chimp.proto";
option java_outer_classname = "FinalValueProtos";
message FinalValue {
google.protobuf.Timestamp timestamp = 1;
uint32 inputId = 2;
google.protobuf.DoubleValue value = 3;
uint32 sourceId = 4;
string inputGuid = 5;
}
  1. 您需要使用protoc从模式生成c#类

  2. 如果使用模式注册表,则不需要自己的反序列化器。参见示例代码

    using (var consumer =
    new ConsumerBuilder<string, YourProtoMessage>(consumerConfig)
    .SetValueDeserializer(new ProtobufDeserializer<YourProtoMessage>().AsSyncOverAsync())
    

如果您不使用模式注册表,那么需要通过实现IDeserializer来定义自己的反序列化器,如另一个答案

中所述

你需要一个类来实现Kafka API文档中定义的IDeserializer<T>接口。然后,您的标记行将类似于:

.SetValueDeserializer(new MyCustomDeserializer())

使用Protobuf.net自定义序列化器的示例

using Confluent.Kafka;
using System.IO;
using SerializationContext = Confluent.Kafka.SerializationContext;
namespace Common;
/// <summary>
/// Kafka protobuf serializer.
/// </summary>
/// <typeparam name="T">Type to serialize. </typeparam>
public class KafkaProtobufSerializer<T> : ISerializer<T> where T : class
{
/// <inheritdoc/>
public byte[] Serialize(T data, SerializationContext context)
{
using var ms = new MemoryStream();
ProtoBuf.Serializer.Serialize<T>(ms, data);
return ms.ToArray();
}
}
/// <summary>
/// Protobuf deserializer.
/// </summary>
/// <typeparam name="T">Type to deserialize.</typeparam>
public class KafkaProtobufDeserializer<T> : IDeserializer<T> where T : class, new()
{
/// <inheritdoc/>
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull || data.IsEmpty)
{
return new T();
}

return ProtoBuf.Serializer.Deserialize<T>(data);
}
}

消费者使用示例

var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = groupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
};
var consumer = new ConsumerBuilder<TKey, TValue>(config)
.SetValueDeserializer(new KafkaProtobufDeserializer<TValue>())
.SetErrorHandler((_, e) => workerLogger.LogError("Consumer Builder:{reason}", e.Reason))
.Build();

最新更新