我们在纱线上运行flink。我们正在执行灾难恢复测试,作为测试的一部分,我们手动终止了一个运行了flink应用程序的节点。一旦实例被恢复,应用程序就会进行多次尝试,每次尝试都会出现以下错误:
AM Container for appattempt_1602902099413_0006_000027 exited with exitCode: -1000
Failing this attempt.Diagnostics: Could not obtain block: BP-986419965-xx.xx.xx.xx-1602902058651:blk_1073743332_2508
file=/user/hadoop/.flink/application_1602902099413_0006/application_1602902099413_0006-flink-conf.yaml1528536851005494481.tmp
org.apache.hadoop.hdfs.BlockMissingException:
Could not obtain block: BP-986419965-10.61.71.85-1602902058651:blk_1073743332_2508 file=/user/hadoop/.flink/application_1602902099413_0006/application_1602902099413_0006-flink-conf.yaml1528536851005494481.tmp
at org.apache.hadoop.hdfs.DFSInputStream.refetchLocations(DFSInputStream.java:1053)at
org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:1036)at
org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:1015)at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:647)at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:926)at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:982)at
java.io.DataInputStream.read(DataInputStream.java:100)at
org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:90)at
org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:64)at
org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:125)at
org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:369)at
org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:267)at
org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)at
org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)at
org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)at
java.security.AccessController.doPrivileged(Native Method)at
javax.security.auth.Subject.doAs(Subject.java:422)at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)at
org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)at
org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)at
java.util.concurrent.FutureTask.run(FutureTask.java:266)at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at
java.util.concurrent.FutureTask.run(FutureTask.java:266)at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
java.lang.Thread.run(Thread.java:748)For
more detailed output, check the application tracking page: http://<>.compute.internal:8088/cluster/app/application_1602902099413_0006 Then click on links to logs of each attempt.
有人能告诉我们HDFS中存储了什么内容吗?是否可以将其重定向到S3?
添加检查点相关设置:
StateBackend rocksDbStateBackend = new RocksDBStateBackend("s3://Path", true);
streamExecutionEnvironment.setStateBackend(rocksDbStateBackend)
streamExecutionEnvironment.enableCheckpointing(10000);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(60000);
streamExecutionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
streamExecutionEnvironment.getCheckpointConfig().setPreferCheckpointForRecovery(true);
我遇到了同样的问题,
我修复了hdfs站点的设置
{
"Classification": "hdfs-site",
"Properties": {
"dfs.client.use.datanode.hostname": "true",
"dfs.replication": "2",
"dfs.namenode.replication.min": "2",
"dfs.namenode.maintenance.replication.min": "2"
}
}
我认为hdfs在终止的节点上丢失了数据,所以我在多节点上复制数据。