我想在kubernetes上运行一个apache-flink(1.11.1(流应用程序。文件系统状态后端保存到s3。到s3的检查点正在中工作
args:
- "standalone-job"
- "-s"
- "s3://BUCKET_NAME/34619f2862ce3e5fc91d80eae13a434a/chk-4/_metadata"
- "--job-classname"
- "com.abc.def.MY_JOB"
- "--kafka-broker"
- "KAFKA_HOST:9092"
所以我面临的问题是:
- 我必须手动选择上一个状态目录。有没有可能让它变得更好
- 作业增加chk目录,但不使用检查点。意味着当我第一次看到事件时,我会抛出一个新事件,并将其存储到
ListState<String>
中。每当我通过Gitlab部署我的应用程序的新版本时,它会再次抛出此事件 - 当我定义了state.backend-to-filesystem时,为什么我必须在代码中显式启用检查点?
env.enableCheckpointing(Duration.ofSeconds(60).toMillis());
和env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);
- 使用Ververica Platform:Community Edition可能会更开心,它将抽象级别提高到了不必处理该级别的细节的程度。它有一个API,设计时考虑到了CI/CD
- 我不确定我是否理解你的第二点,但在恢复过程中,你的工作会倒带并重新处理一些数据,这是正常的。Flink并不保证只进行一次处理,而是保证只执行一次语义:每个事件将只影响Flink管理的状态一次。这是通过回滚到最新检查点中的偏移量来完成的,并在消耗了所有达到这些偏移量的数据后,将所有其他状态回滚到原来的状态
- 在作业运行时,有一个状态后端作为存储作业工作状态的地方是必要的。如果不启用检查点,则工作状态将不会被检查点,并且无法恢复。然而,从Flink 1.11开始,您可以通过配置文件启用检查点,使用
execution.checkpointing.interval: 60000
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
有几种方法可以将工作负载部署到kubernetes、简单的YAML文件、Helm Chart和Operator。
升级有状态的Flink作业并不像升级无状态服务那么简单,您只需要更新二进制文件并重新启动即可。
升级Flink Job你需要获取一个保存点或获取最新的检查点目录,然后更新二进制文件,最后重新提交你的作业,在这种情况下,我认为简单的YAML文件和Helm Chart不能帮助你实现这一点,你应该考虑实现Flink Operator来完成升级工作。
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator