无法重新启动 Kafka 使用者应用程序,由于 OffsetOutOfRangeException 而失败



目前,我的 Kafka 消费者流应用程序正在手动将偏移量提交到 Kafka 中,enable.auto.commit设置为false。 当我尝试重新启动它时,应用程序失败,抛出以下异常:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{partition-12=155555555}

假设上述错误是由于消息不存在/分区因保留期而被删除,我尝试了以下方法:

我禁用了手动提交并启用了自动提交(enable.auto.commit=trueauto.offset.reset=earliest) 它仍然失败并出现相同的错误

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{partition-12=155555555}

请建议重新启动作业的方法,以便它可以成功读取存在消息/分区的正确偏移量

您正在尝试从主题partition的分区12读取偏移量155555555,但是由于保留策略,它可能已被删除。

您可以使用 Kafka Streams 应用程序重置工具来重置 Kafka Streams 应用程序的内部状态,以便它可以从头开始重新处理其输入数据

$ bin/kafka-streams-application-reset.sh
Option (* = required)         Description
---------------------         -----------
* --application-id <id>       The Kafka Streams application ID (application.id)
--bootstrap-servers <urls>    Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2
(default: localhost:9092)
--intermediate-topics <list>  Comma-separated list of intermediate user topics
--input-topics <list>         Comma-separated list of user input topics
--zookeeper <url>             Format: HOST:POST
(default: localhost:2181)

或者使用新的使用者组 ID 启动使用者。

我遇到了同样的问题,我在应用程序中使用包org.apache.spark.streaming.kafka010。一开始,我以为 auto.offset.reset 策略不起作用,但是当我在对象 KafkaUtils 中阅读方法修复 KafkaParams 的描述时,我发现配置已被覆盖。我想它调整执行器的配置ConsumerConfig.AUTO_OFFSET_RESET_CONFIG的原因是保持驱动程序和执行器获得的一致偏移量。

最新更新