当我们在纱线上运行 Flink 时,已完成/终止/失败的作业存储在作业实现中。例如,我们在 hdfs 上有以下作业存档。关于这些作业归档是如何在 hdfs 上生成和存储的指示吗?
-rw-r--r-- 3 aaaa hdfs 10568 2019-07-09 18:34 /tmp/flink/completed-jobs/f909a4ca58cbf1d233a798f7de9489e0
-rw-r--r-- 3 bbbb hdfs 9966 2019-06-20 22:08 /tmp/flink/completed-jobs/fa1fb72ea43348fa84232e7517ca3c91
-rw-r--r-- 3 cccc hdfs 12487 2019-06-26 20:45 /tmp/flink/completed-jobs/fa2b34566384ec621e0d05a2073b8e90
-rw-r--r-- 3 dddd hdfs 57212 2019-07-16 00:41 /tmp/flink/completed-jobs/fa76acb920eec0880a986fb23fbb9149
在 Flink 存储库中找到了一个相关文件:
https://github.com/apache/flink/blob/57a2b754f6a5d8844aa35afb511901ad7ee43068/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java#L71
HistoryServerArchivist
是从 flink/运行时/调度程序/调度程序调用的.java
https://github.com/apache/flink/blob/57a2b754f6a5d8844aa35afb511901ad7ee43068/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L126
@Override
public CompletableFuture<Acknowledge> archiveExecutionGraph(
AccessExecutionGraph executionGraph) {
try {
FsJobArchivist.archiveJob(archivePath, executionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(executionGraph));
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (IOException e) {
return FutureUtils.completedExceptionally(e);
}
}