在StreamExecutionEnvironment
上调用execute
并启动流作业之前,有没有办法以编程方式确定作业是否已从保存点还原?我需要知道这些信息,以便在构建作业图时可以根据它设置 Kafka 源的偏移量。
似乎具有方法initializeState
FlinkConnectorKafkaBase
类可以访问此类信息(代码(。但是,由于initializeState
是一种final
方法,因此无法截获FunctionInitializationContext
并检索isRestored()
值。此外,initializeState
方法在执行作业图后被调用,因此我认为没有与之相关的可行解决方案。
我所做的另一个尝试是找到一个 Flink 作业参数,该参数指示作业是否从保存点启动。但是,我认为不存在这样的参数。
您只需执行以下操作即可获得所需的效果:
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();
如果使用 setStartFromLate,那么 Flink 将忽略存储在 Kafka 中的偏移量,而是从最早的记录开始读取。此外,即使你使用 setStartFromEarliest,如果 Flink 从检查点或保存点恢复,它也会改用存储在该快照中的偏移量。
请注意,Flink 会执行自己的 Kafka 偏移量管理,当从检查点恢复时会忽略存储在 Kafka 中的偏移量。Flink 这样做是作为提供精确一次保证的一部分,这需要确切地知道消耗了多少输入来产生在检查点或保存点中捕获的其余状态中存在的结果。出于这个原因,Flink 总是将偏移量存储为每个状态快照(检查点或保存点(的一部分。
这记录在这里和这里。
至于你最初的问题initializeState
,如果你实现CheckpointedFunction
接口,这是可用的,但实际需要这个是相当罕见的。