我最近在没有HA作业集群的情况下将Flink从1.9.0
迁移到1.11.1
。我面临以下错误,导致JobManager
每5分钟失败一次,Flink作业通过AWS ECS卡在这些重启循环中。
它在升级1.11.1
后曾在Flink 1.9.0中工作,但现在不行了。由于我没有JM HA,所以我为每个flink作业生成固定的--job-id
,而不是默认的id 00000000000。我是Flink的新手。
org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint
5.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 's3://data/flink/checkpoints/<unique_job_id>/chk-5/_metadata'
already exists
问题似乎是在多个运行中重复使用相同的作业id,这会导致冲突。如果您没有使用HA,那么您应该始终为每个作业运行/作业提交生成一个唯一的作业id。最简单的方法是生成一个随机id。如果您想从HA存储中存储的状态恢复运行的作业,则只需要修复作业id。