Kafka consumer infinite retry | Rebalance issue on scaling



我在Kafka消费者上面临一些奇怪的再平衡问题。我已经实现了一个无限重试策略的错误使用SeekToCurrentErrorHandler设置max.poll.interval.ms=1,200,000为20分钟,并设置重试延迟为900,000 15分钟延迟。
可以看到poll.interval>延迟。我面临的问题是,当一个新的消费者被添加时,它会重新平衡,旧的消费者离开群体,只有新的消费者接收和处理消息。在新的消费者中,我看到了日志Attempt to heartbeat failed since group is rebalancing,但它仍然处理消息。旧消费者接收到0数据。

我的消费者配置如下:

spring:
    kafka:
       ......
       ......
        consumer:
             max.poll.records: 150
             group.id: xxxxx
             properties:
                   enable.auto.commit: false
                   max.poll.interval.ms: 1200000 #20 minutes greater than retry interval

Kafka java config:

public ConcurrentkafkaListenerContainerFactory<String, byte[]> kafkaFactory() {
    
       ConcurrentkafkaListenerContainerFactory<String, byte[]> = new 
                                ConcurrentkafkaListenerContainerFactory();
        ......
        ......
   factory.setErrorHandler(kafkaErrorHandler);
   facory.setRetryTemplate(retrtTemplate());
   factory.setStatefulRetry(true)
   factory.getContainerProperties().setAckOnError(false);
   factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANIAL_IMMEDIATE);
   return factory;
  }

错误处理程序:

@Component
public class KafkaErrorHandler extends SeekToCurrentErrorHandler {
   KafkaErrorHanbdler(){super(-1)}
    @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, 
          ?> consumer, MessageListenerContainer container) {
            LOG.info("handle");
            super.handle(thrownException, records, consumer, container);
        }
    
}

我的应用程序需要大约10秒来处理每个事件,并在处理成功后才发送确认。它按照配置接收150条消息。最大轮询时间间隔设置为20分钟。kafka是否只在处理了150条消息后才进行轮询?

如果使用有状态重试,则抛出异常到容器,我们将重新查找未处理的记录并再次轮询。

DEBUG日志可以帮助你找出问题所在。

对于较新的版本(从2.3开始),您可以使用BackOff而不是重试模板来配置SeekToCurrentErrorHandler

添加一个新的消费者,而当前的一个是繁忙的将意味着重新平衡将被推迟,直到第一个再次投票。现有的消费者不会"离开"这个群体。直到超时。