Kafka如何在消息处理成功之前不移动偏移量



我们正在读取来自kafka主题的消息。我在(错误的)印象下,通过设置EnableAutoOffsetStore/enable.auto.offset。Store = false作为消费者,你可以选择何时移动偏移量

我们像这样使用

while (!cancellationToken.IsCancellationRequested)
{
try{       
var consumeResult = kafkaConsumer.Consume(cancellationToken);             
// process consumeResult.Message
kafkaConsumer.StoreOffset(consumeResult);
}
catch
{
// delay and re-try
} 
}

通过这种方式,如果遇到异常,可以重新读取消息。我们有1个生产者,1个消费者,1个主题,1个分区(和1个组)

这个行为应该如何实现?

如果你在process中捕获了一个异常,那么提交将永远不会发生。

您可以存储失败记录的分区+偏移量,并将Seek存储回该偏移量。

或者,您可以跳过主处理循环中的偏移量,将该记录写入死信主题,并将不同的消费者再次写入"处理">

最新更新