无法将点/检查点链接状态保存到 AWS S3 存储桶



我正在尝试检查/保存我在 EMR 上运行的 flink 状态到 AWS 上的 s3 存储桶。请注意:

  • 实例(主节点和核心节点(已正确设置 IAM 角色以访问 s3 存储桶及其中的所有目录/文件(AmazonS3FullAccess 策略附加到角色,没有任何内容覆盖它(。
  • 我可以成功地使用从属节点和主节点的aws s3 cp xxx s3://flink-bc/checkpoints将文件复制到存储桶
  • 使用 hdfs 进行保存点/检查点工作
  • 如果我将检查点设置为使用 hdfs,然后尝试将点保存到 s3,则保存点操作错误如下所示
org.apache.flink.util.FlinkException: Triggering a savepoint for the job 16c162c47f225cddad974056c9494b6d failed.
at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint.........
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) 

作业管理器日志:

java.io.IOException: Cannot instantiate file system for URI: s3://flink-bc/savepoints
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.initializeLocationForSavepoint(AbstractFsCheckpointStorage.java:147)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerCheckpoint(CheckpointCoordinator.java:511)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:370)
at org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:951)

我在将最新的 Flink 版本(1.10.0(与 s3 一起使用时遇到了类似的问题,将检查点存储在 s3 存储桶中。

因此,请找到我在此处提供的详细工作答案

最新更新