我遇到的问题是,当Kafka和Flink作业重新启动时,FlinkKafkaConsumer偏移量会重置为0,因此,即使我启用了检查点,并且在Flink作业中只启用了一次语义,数据也会被重新处理。
以下是我的环境详细信息
- 在Kubernetes下运行
- Kafka源主题有10个分区,没有复制
- 卡夫卡有3个经纪人
- Flink检查点是通过一次语义启用的
- Flink版本为1.12
- Flink通过负载均衡器连接到Kafka
只有当源主题有一个以上的分区,并且Flink无法连接到分布分区的所有3个代理时,才会出现此问题。只有Flink最初连接到的代理中不可用的分区的偏移量才会重置为0。如果我在所有3个代理都可用后等待重新启动Flink作业,则没有问题。以下是KafkaConsumer设置
auto.offset.reset:最早隔离级别:read_committed
我还在KafkaConsumer上启用了setStartFromGroupOffsets和setCommitOffsetsOnCheckpoint。
注意,我确保主题中的所有数据都被作业消耗掉了,Flink通过验证Kafka中提交的偏移量成功地检查了数据。我们非常感谢为解决这一问题提供的任何帮助。
我不知道这是Flink问题还是Kafka客户端问题。任何帮助都将不胜感激。
我认为,期望这样一个没有任何复制设置的设置提供任何保证是不现实的。