在 Flink 中如何生成作业归档?



当我们在纱线上运行 Flink 时,已完成/终止/失败的作业存储在作业实现中。例如,我们在 hdfs 上有以下作业存档。关于这些作业归档是如何在 hdfs 上生成和存储的指示吗?

-rw-r--r--   3 aaaa   hdfs      10568 2019-07-09 18:34 /tmp/flink/completed-jobs/f909a4ca58cbf1d233a798f7de9489e0
-rw-r--r--   3 bbbb   hdfs       9966 2019-06-20 22:08 /tmp/flink/completed-jobs/fa1fb72ea43348fa84232e7517ca3c91
-rw-r--r--   3 cccc   hdfs      12487 2019-06-26 20:45 /tmp/flink/completed-jobs/fa2b34566384ec621e0d05a2073b8e90
-rw-r--r--   3 dddd   hdfs      57212 2019-07-16 00:41 /tmp/flink/completed-jobs/fa76acb920eec0880a986fb23fbb9149

在 Flink 存储库中找到了一个相关文件:

https://github.com/apache/flink/blob/57a2b754f6a5d8844aa35afb511901ad7ee43068/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java#L71

HistoryServerArchivist是从 flink/运行时/调度程序/调度程序调用的.java

https://github.com/apache/flink/blob/57a2b754f6a5d8844aa35afb511901ad7ee43068/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L126

@Override
public CompletableFuture<Acknowledge> archiveExecutionGraph(
AccessExecutionGraph executionGraph) {
try {
FsJobArchivist.archiveJob(archivePath, executionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(executionGraph));
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (IOException e) {
return FutureUtils.completedExceptionally(e);
}
}

相关内容

  • 没有找到相关文章

最新更新