我正在使用Debezium对我的DB执行CDC,以在Kafka中创建消息。使用kafdrop和OffsetExplorer等工具,我可以看到消息键和值。但是,在.NET框架应用程序中,使用Confluent.Kafka库,当我使用消息时,消息键始终为null。
如何使用Confluent.Kafka库检索消息密钥?
以下是VS项目的代码:
using System;
using Confluent.Kafka;
using System.Threading;
namespace KafkaConsumer
{
class Program
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "kakfa:9092,localhost:9093",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
var topics = "CAC_connector.dbo.sessionLogs";
bool cancelled = false;
// Define the cancellation token.
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(topics);
while (!cancelled)
{
var cr = consumer.Consume(cancellationToken);
Console.WriteLine($"Consumed record with key {cr.Message.Key} and value {cr.Message.Value.Substring(0, 96)}");
}
consumer.Close();
}
}
}
}
干杯,Kyley
正如OneCricketer所指出的,我最初构建的ConsumerBuilder是为了忽略密钥。当我将构造函数更改为ConsumerBuilder<string, string>
时,密钥包含在Message中。