我有一个flink作业,通过KafkaSource配置为侦听主题的正则表达式,类似于:
val topicPattern = "^(topic1|topic2|topic3)$"
Kafka消费者开始位置配置设置为startFromLatest,如下所示:
val myConsumer = new FlinkKafkaConsumer<>(topicPattern, someProperties);
myConsumer.setStartFromLatest();
我们通过配置传递topicPattern,有时会发生新的kafka生产者生成数据,比如topic4
,然后我们更新配置添加这个新主题并使用保存点重新启动作业。
在这种情况下,我们注意到kafka源从一开始就读取了这个新主题。有人能解释一下原因吗?Kafka的auto.offset.reset
属性开始起作用了吗?
据我所知,目前这就是FlinkKafkaConsumer
的工作方式,如果从保存点恢复所有不属于保存点的主题将自动设置EARLIEST
偏移量。这很可能是一个bug,所以我正在创建一个bug报告。