Kafka偏移量有时会在消费者重新启动后重置



我有一个Kafka主题,它有4个分区和4个具有相同Consumer Group ID的使用者。所有消费者每晚都会重新启动,第一次是凌晨4:00,第二次是凌晨4:15,其他时间是4:30和4:45(这是我们管理员的规定,我对此无能为力(。某些分区的某些天数偏移量重置,消费者在重新启动后读取所有以偏移量0开始的消息。有些日子它不会发生。你知道问题出在哪里吗?消费者正在积极行动。

Kafka配置:

conf := kafka.ReaderConfig{
Brokers:  cfg.Kafka.BootstrapServers,
GroupID:  cfg.Kafka.ConsumerGroup,
Topic:    cfg.Kafka.Topic,
MinBytes: 10e3, // 10KB
MaxBytes: 32e6, // 10MB
}

阅读信息:

for {
m, err := reader.ReadMessage(context.Background())
if err != nil {
go log.Error(componentName, actionName, -123753, err.Error(), "Ошибка чтения сообщения с кафки", "", "", string(m.Value), nil)
continue
}
log.Warn(componentName, actionName, "Обработка сообщения", "START", "", "", map[string]interface{}{"offset": m.Offset, "partition": m.Partition}, nil)
var msg model.KafkaMessage
err = json.Unmarshal(m.Value, &msg)
if err != nil {
go log.Error(componentName, actionName, -123754, err.Error(), "Ошибка парсинга сообщения", "", "", string(m.Value), nil)
continue
}
_service := service.NewService(log, cfg)
currentNumAttempts := 0 
maxNumAttempt := 3      
for currentNumAttempts < maxNumAttempt {
currentNumAttempts++
err = _service.UpdateNumberInColvir(msg)
if err != nil {
continue
}
break
}
}

根据您的评论And cleanup.policy is DELETE可能会发生以下情况。

__consumer_offsets主题跟踪使用者组的每个主题分区的最后提交偏移量。

cleanup.policy = delete表示您的__consumer_offsets主题分区日志在retention.ms之后删除。

因此,您的消费者在retention.ms期间没有消费任何消息,您的__consumer_offset主题将不会更新,偏移主题中的所有消息都将被删除。那么,在__consumer_offsets主题中,您的消费者群体没有任何轨迹。所以,卡夫卡认为它是作为一个新的消费群体来划分话题,从一开始就开始阅读消息。

注:对于__consumer.offsets主题,cleanup.policy应为compact

最新更新