在 Apache Flink 中定义和执行我的流处理器的作业图之前,我想运行一些初始化代码,例如,用于创建 Kafka 主题,我将其用作作业图中的接收器。 但是,当流处理器从保存点还原时(例如,在流处理器更新期间(,不应运行此初始化代码。有没有办法以编程方式检查作业是否从保存点启动?
可以实现能够识别快照和恢复的 Flink 函数。您可以通过实现CheckpointedFunction
接口来执行此操作。然后,当调用initializeState(FunctionInitializationContext context)
时,您可以检查context.isRestored()
以确定作业是否从快照(即,从检查点或保存点(重新启动。
您可以采取的另一种方法是检查主题是否已存在,如果不存在,请继续创建它们,而不考虑作业是如何开始的。
从保存点重新启动作业时,必须指定保存点目录的路径。
我们通过以下方式执行此操作:
$ bin/flink run -s :savepointPath [:runArgs]
如果我正确理解您的问题,您所要做的就是验证是否指定了--fromSavepoint
或-s
(别名(。
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#resuming-from-savepoints