我有以下代码从Kafka主题读取数据。我的目标是定期读取主题中最新的消息,因为我想在实时图表中使用这些数据。我写了下面的代码。但是如果我运行代码,我就会从过去的某个地方开始阅读(24小时前)。我想我需要在代码中定义偏移量之类的东西?我如何在Kafka Confluent消费者中做到这一点?
public void Read_from_Kafka()
{
try
{
var config = new ConsumerConfig
{
BootstrapServers = kafka_URI,
GroupId = "group",
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol = SecurityProtocol.Ssl,
SslCaLocation = "path1",
SslCertificateLocation = "path2",
SslKeyLocation = "path3",
SslKeyPassword = "password",
};
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(topic_name);
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;
using (StreamWriter sw = File.AppendText(json_log_file))
{
sw.WriteLine("JSON: " + Kafka_message_total + " " + Convert.ToString(DateTime.Now));
}
System.Threading.Thread.Sleep(2000);
}
consumer.Close();
}
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Stop Kafka " + " " + Convert.ToString(DateTime.Now));
}
}
catch(Exception ex)
{
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
}
}
}
更新1
我已经尝试设置AutoOffsetReset = AutoOffsetReset。这是最新的,但我仍然在阅读过去的数据。我认为这个设置不足以达到我的目的。
您需要在消费者上调用Seek
函数以到达最高水位- 1。否则,从最新偏移量开始的消费者将只等待下一个生成的事件(可能永远不会出现)。
或者,使用Kafka Connect等不同的工具,例如,将数据写入Elasticsearch, InfluxDB等,您可以配置像Grafana这样的图形工具来显示来自这些系统的数据。
我不认为Seek方法在。net confluent-kafka-dotnet包中工作得很好,建议使用TopicPartitionOffset代替Assign。
如果您只有一个分区,即分区0,则下面的代码查找主题为"purchase_"的最后一条消息:
using Confluent.Kafka;
using System;
class Consumer
{
static void Main(string[] args)
{
ConsumerConfig config = new()
{
BootstrapServers = "localhost:9092",
GroupId = Guid.NewGuid().ToString(),
};
using IConsumer<string, string> consumer
= new ConsumerBuilder<string, string>(config).Build();
try
{
TopicPartition topicPartition = new("purchases", new Partition(0));
WatermarkOffsets watermarkOffsets
= consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(3));
TopicPartitionOffset topicPartitionOffset
= new(topicPartition, new Offset(watermarkOffsets.High.Value - 1));
consumer.Assign(topicPartitionOffset);
ConsumeResult<string, string> consumeResult
= consumer.Consume(TimeSpan.FromSeconds(3));
Console.Write($"Last message value = {consumeResult.Message.Value}, " +
$"position = {consumer.Position(topicPartition)}");
}
finally { consumer.Close(); }
}
}
显然,我在这里使用了超时而不是取消令牌,但是使用取消令牌不应该影响逻辑。
有一些警告,在某些情况下Kafka不会以这样简单的方式使用偏移量,并且简单地从分区的高水位偏移量中去掉一个可能并不总是有效。在我测试过的例子中,它工作得很好。
如果您有多个分区,可以扩展代码,通过迭代并将new Partition(0)
更改为适当的索引来查找每个分区中的最后一条消息,例如分区1的new Partition(1)
。对于多个分区,我不认为Kafka本身知道哪个是最后写入的消息。除非在分区内,否则不能保证顺序。但是,一旦您获得了每个分区中的最后一条消息,您就可以使用一些消息属性来解决这个问题(增加ID,时间戳)。