处理批处理使用者中@KafkaListener错误的提交



我们有一个Kafka Consumer设置,如下所示

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
final Map<String, Object> props = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> batchFactory(
final ConsumerFactory<String, Object> consumerFactory,
@Value("${someProp.batch}") final boolean enableBatchListener,
@Value("${someProp.concurrency}") final int consumerConcurrency,
@Value("${someProp.error.backoff.ms}") final int errorBackoffInterval
) {
final SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
errorHandler.setBackOff(new FixedBackOff(errorBackoffInterval, UNLIMITED_ATTEMPTS));
final var containerFactory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
containerFactory.setConsumerFactory(consumerFactory);
containerFactory.getContainerProperties().setAckMode(MANUAL_IMMEDIATE);
containerFactory.getContainerProperties().setMissingTopicsFatal(false);
containerFactory.setBatchListener(enableBatchListener);
containerFactory.setConcurrency(consumerConcurrency);
containerFactory.setBatchErrorHandler(errorHandler);
return containerFactory;
}
someProp:
concurrency: 16
batch: true
error.backoff.ms: 2000
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
consumer:
groupId: some-grp
autoOffsetReset: earliest
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL}
specific.avro.reader: true
security.protocol: SSL

在用@KafkaListener注释的批处理侦听器方法中,我们在列表处理结束时调用acknowledgment.acknowledge()。假设当服务启动时,我已经在主题中有一百万条消息可供服务使用,我对此方案有以下问题,因为我找不到详细讨论批处理侦听的文档:

  1. 侦听器将在列表中读取 500 条消息。 500,因为未设置max.poll.records,因此默认为 500,因此列表将包含 500 条消息。这种理解正确吗?
  2. 鉴于上述情况,消费者并发性在哪里出现?所述配置是否意味着我将有 16 个使用者,每个使用者可以从同一主题并行读取 500 条消息?
  3. 明白,在这种情况下,我必须至少有 16 个分区才能利用所有消费者,否则我会留下什么都不做的消费者?
  4. 由于SeekToCurrentBatchErrorHandler,如果侦听器方法内部的处理有任何异常,批处理将被重放。因此,如果在特定批次中处理第 50 条消息时出现异常,将再次播放前 49 条消息(基本上是重复的,我很好(,接下来的 50 到 500 条消息将被播放并尝试照常处理。这种理解正确吗?
  5. 如果连续读取多个批处理并且特定的使用者线程卡在SeekToCurrentBatchErrorHandler上,则如何处理偏移量提交,因为其他使用者线程仍将成功处理消息,从而将偏移指针向前移动,然后卡住的使用者偏移量
  6. MANUAL_IMMEDIATE状态的文档
/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer
* immediately processes the commit.
*/
MANUAL_IMMEDIATE,

这是否意味着调用acknowledgment.acknowledge()是不够的,必须以某种方式使用AcknowledgingMessageListener?如果是,首选方法是什么。

  1. 你会得到"最多"500;不能保证你会得到500。

  2. 是;16 个使用者(假设您至少有 16 个分区(。

  3. 正确。

  4. 正确;但是 2.5 版现在有RecoveringBatchErrorHandler,你可以抛出一个特殊的异常来告诉它错误在批处理中发生的位置;它将提交成功记录的偏移量并寻找剩余的偏移量。

  5. 使用者
  6. 获得唯一的分区,因此"卡住"的使用者对其他使用者没有影响。

  7. 我不确定你在那里问什么;如果你打电话给ack.acknowledge()你已经在使用AcknowledgingMessageListener(@KafkaListener总是有这个能力;我们只用手动确认模式填充ack。

但是,对于此用例,您确实不需要使用手动确认;当侦听器正常退出时,容器将自动提交偏移量;无需不必要地使您的代码复杂化。

最新更新