Spring kafka 不支持大型消息使用者



我正在使用 spring Kafka 来消费由LinkedIn大消息支持的 Kafka 客户端生成的消息

鉴于此 Kafka 客户端始终覆盖AUTO_OFFSET_RESET_CONFIG到无,如其构造函数所示。

private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer,
Auditor<K, V> consumerAuditor) {
_kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(),
byteArrayDeserializer,
byteArrayDeserializer);
}
Map<String, Object> configForVanillaConsumer() {
Map<String, Object> newConfigs = new HashMap<>();
newConfigs.putAll(this.originals());
newConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
newConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
return newConfigs;
}

因此,一旦我开始使用批量提交并将ENABLE_AUTO_COMMIT_CONFIG设置为 false,它就会引发以下错误:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User 提供的侦听器 com.LinkedIn.kafka.clients.consumer.LiKafkaConsumerRebalanceListener for group document-event-consumer 在分区分配上失败 org.apache.kafka.clients.consumer.NoOffsetForPartitionException: 未定义的偏移量,分区没有重置策略:DocumentEvents-2 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:369( at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPosition(Fetcher.java:247( at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPosition(KafkaConsumer.java:1602( at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1265( at com.LinkedIn.kafka.clients.consumer.LiKafkaConsumerImpl.position(LiKafkaConsumerImpl.java:403( at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:447( at com.LinkedIn.kafka.clients.consumer.LiKafkaConsumerRebalanceListener.onPartitionsAssigned(LiKafkaConsumerRebalanceListener.java:62( at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255( at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeed(AbstractCoordinator.java:339( at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303( at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286( at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030( at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995( at com.LinkedIn.kafka.clients.consumer.LiKafkaConsumerImpl.poll(LiKafkaConsumerImpl.java:231( at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:558( at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511( at java.util.concurrent.FutureTask.run(FutureTask.java:266( at java.lang.Thread.run(Thread.java:745(

发生此问题的原因是,这是此使用者组首次使用本主题中的消息,因此它会尝试使用偏移重置策略。

虽然我将其设置为"最早",但它被底层LinkedIn kafka 客户端覆盖为"无">

在这种情况下,我还尝试覆盖ConsumerRebalanceListener以手动寻找开头,但实际上它并没有达到这一点。

如何解决此问题?

有趣;请在 GitHub 中打开一个问题。

如果策略none,我们应该捕获该异常。

同时,您可以通过仅使用常规客户端一次来解决此问题,以实际为组设置初始偏移量(您不必实际接收任何消息,只需分配分区并设置组的初始位置(。

相关内容

  • 没有找到相关文章