我在Kafka消费者上面临一些奇怪的再平衡问题。我已经实现了一个无限重试策略的错误使用SeekToCurrentErrorHandler
设置max.poll.interval.ms=1,200,000
为20分钟,并设置重试延迟为900,000
15分钟延迟。
可以看到poll.interval>延迟。我面临的问题是,当一个新的消费者被添加时,它会重新平衡,旧的消费者离开群体,只有新的消费者接收和处理消息。在新的消费者中,我看到了日志Attempt to heartbeat failed since group is rebalancing
,但它仍然处理消息。旧消费者接收到0数据。
我的消费者配置如下:
spring:
kafka:
......
......
consumer:
max.poll.records: 150
group.id: xxxxx
properties:
enable.auto.commit: false
max.poll.interval.ms: 1200000 #20 minutes greater than retry interval
Kafka java config:
public ConcurrentkafkaListenerContainerFactory<String, byte[]> kafkaFactory() {
ConcurrentkafkaListenerContainerFactory<String, byte[]> = new
ConcurrentkafkaListenerContainerFactory();
......
......
factory.setErrorHandler(kafkaErrorHandler);
facory.setRetryTemplate(retrtTemplate());
factory.setStatefulRetry(true)
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANIAL_IMMEDIATE);
return factory;
}
错误处理程序:
@Component
public class KafkaErrorHandler extends SeekToCurrentErrorHandler {
KafkaErrorHanbdler(){super(-1)}
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?,
?> consumer, MessageListenerContainer container) {
LOG.info("handle");
super.handle(thrownException, records, consumer, container);
}
}
我的应用程序需要大约10秒来处理每个事件,并在处理成功后才发送确认。它按照配置接收150条消息。最大轮询时间间隔设置为20分钟。kafka是否只在处理了150条消息后才进行轮询?
如果使用有状态重试,则抛出异常到容器,我们将重新查找未处理的记录并再次轮询。
DEBUG日志可以帮助你找出问题所在。
对于较新的版本(从2.3开始),您可以使用BackOff而不是重试模板来配置SeekToCurrentErrorHandler
。
添加一个新的消费者,而当前的一个是繁忙的将意味着重新平衡将被推迟,直到第一个再次投票。现有的消费者不会"离开"这个群体。直到超时。