手动重置kafka偏移时,闪烁偏移变为不一致状态



我们有一个flink流应用程序,可以从kafka读取消息。由于某种原因,我们不得不从卡夫卡重置命令中将卡夫卡偏移重置为最新值,因为出现了大量堆积。我们希望flink应用程序跳过所有这些消息,从重置后的新消息开始。

问题是,由于flink在内部管理其偏移量,它不知道这次重置,现在它只从后向读取消息(重置前的偏移点(,现在也无法提交偏移量。因此,每次重新启动flink应用程序时,它都会再次从同一点读取。所以我们每次重启都有重复的消息。

我知道我们不应该在flinkkafka应用程序中手动重置偏移量。但我们如何从中恢复过来。

我已经尝试将auto.offset.config设置为最新,但它仍然会再次读取这些消息。

只有当Flink从故障中恢复或从保存点或检查点手动重新启动时,它才会使用检查点或保存点中记录的偏移量。

否则,Flink Kafka消费者将开始从消费者组在Kafka代理中提交的偏移量中读取,或者从您在代码中明确指定的偏移量(即(中读取

myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (msecs)
myConsumer.setStartFromGroupOffsets(); // the default behaviour

我不知道如何使这些事实与你所报道的相一致。

最新更新