我们有一个Kafka Consumer设置,如下所示
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
final Map<String, Object> props = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> batchFactory(
final ConsumerFactory<String, Object> consumerFactory,
@Value("${someProp.batch}") final boolean enableBatchListener,
@Value("${someProp.concurrency}") final int consumerConcurrency,
@Value("${someProp.error.backoff.ms}") final int errorBackoffInterval
) {
final SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
errorHandler.setBackOff(new FixedBackOff(errorBackoffInterval, UNLIMITED_ATTEMPTS));
final var containerFactory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
containerFactory.setConsumerFactory(consumerFactory);
containerFactory.getContainerProperties().setAckMode(MANUAL_IMMEDIATE);
containerFactory.getContainerProperties().setMissingTopicsFatal(false);
containerFactory.setBatchListener(enableBatchListener);
containerFactory.setConcurrency(consumerConcurrency);
containerFactory.setBatchErrorHandler(errorHandler);
return containerFactory;
}
someProp:
concurrency: 16
batch: true
error.backoff.ms: 2000
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
consumer:
groupId: some-grp
autoOffsetReset: earliest
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL}
specific.avro.reader: true
security.protocol: SSL
在用@KafkaListener
注释的批处理侦听器方法中,我们在列表处理结束时调用acknowledgment.acknowledge()
。假设当服务启动时,我已经在主题中有一百万条消息可供服务使用,我对此方案有以下问题,因为我找不到详细讨论批处理侦听的文档:
- 侦听器将在列表中读取 500 条消息。 500,因为未设置
max.poll.records
,因此默认为 500,因此列表将包含 500 条消息。这种理解正确吗? - 鉴于上述情况,消费者并发性在哪里出现?所述配置是否意味着我将有 16 个使用者,每个使用者可以从同一主题并行读取 500 条消息? 我
- 明白,在这种情况下,我必须至少有 16 个分区才能利用所有消费者,否则我会留下什么都不做的消费者?
- 由于
SeekToCurrentBatchErrorHandler
,如果侦听器方法内部的处理有任何异常,批处理将被重放。因此,如果在特定批次中处理第 50 条消息时出现异常,将再次播放前 49 条消息(基本上是重复的,我很好(,接下来的 50 到 500 条消息将被播放并尝试照常处理。这种理解正确吗? - 如果连续读取多个批处理并且特定的使用者线程卡在
SeekToCurrentBatchErrorHandler
上,则如何处理偏移量提交,因为其他使用者线程仍将成功处理消息,从而将偏移指针向前移动,然后卡住的使用者偏移量 MANUAL_IMMEDIATE
状态的文档
/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer
* immediately processes the commit.
*/
MANUAL_IMMEDIATE,
这是否意味着调用acknowledgment.acknowledge()
是不够的,必须以某种方式使用AcknowledgingMessageListener
?如果是,首选方法是什么。
-
你会得到"最多"500;不能保证你会得到500。
-
是;16 个使用者(假设您至少有 16 个分区(。
-
正确。
-
正确;但是 2.5 版现在有
RecoveringBatchErrorHandler
,你可以抛出一个特殊的异常来告诉它错误在批处理中发生的位置;它将提交成功记录的偏移量并寻找剩余的偏移量。
使用者 获得唯一的分区,因此"卡住"的使用者对其他使用者没有影响。
我不确定你在那里问什么;如果你打电话给
ack.acknowledge()
你已经在使用AcknowledgingMessageListener
(@KafkaListener
总是有这个能力;我们只用手动确认模式填充ack。
但是,对于此用例,您确实不需要使用手动确认;当侦听器正常退出时,容器将自动提交偏移量;无需不必要地使您的代码复杂化。