可能快照机制在 Apache Flink 中花费越来越多的内存



我正在学习快照机制在 Flink 中的工作原理。

据我了解,JobManager 将以固定的间隔将障碍插入到每个数据源中,并且每个运算符在收到来自其所有数据源的第 n 个障碍后将执行快照。

如果我是对的,似乎这种机制在某些情况下可能会使用越来越多的记忆。

下面是一个示例:

假设有两个数据源:Source 1Source 2,以及一个运算符。

Source 1 -----
------ Operator
Source 2 -----/

Source 1正在生成整数流:1、2、3、4、5...

Source 2正在生成字符流:A、B、C、D、E...

运算符这样做:它需要来自Source 1的两个输入和一个来自Source 2的输入来生成输出:1a2、3b4、5c6、7d8...

假设 JobManager 插入了两个数据源的障碍,如下所示:

1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...

现在让我们开始吧。

Source 1Source 2两个"屏障A"进入操作员时,Flink 将为操作员制作快照,其当前状态为1a,因为当屏障 A 进入操作员时,1a一直在操作员中。

然后,当两个"屏障 B"进入算子时,算子完成了它的第一个任务:生成1a2,Flink 会制作另一个快照:NAbNA意味着目前没有来自Source 1的新输入。

同时,每个快照都会存储在 RAM、FS 或 RocksDB 中(取决于我们如何配置 Flink(。

如果我是对的,我认为 Flink 在这个例子中会生成越来越多的快照。因为Source 1的消耗速度总是Source 2的两倍。

我是不是误会了什么?

有趣的思想实验。

如果你限制自己只使用 Flink API 的标准部分,就没有办法实现一个用户函数,对于从源 2 读取的每个输入,它将从源 1 读取两个输入。例如,在实现CoProcessFunction时,您将受 Flink 运行时的摆布,它将根据其自己的内部逻辑提供来自任一流的事件。这两个流将相互竞争,可能在不同的线程中运行,甚至在不同的进程中运行。当流收敛时,如果来自两个输入的事件没有按照您希望的顺序提供,则必须在 Flink 状态下缓冲它们,直到准备好处理它们。

这可能导致大量缓冲需求的常见情况是在实现事件时间联接时,其中一个流在时间戳方面远远领先于另一个流(例如,如果汇率流滞后,则使用交易时有效的汇率加入外汇汇率的金融交易(。但是这种缓冲可以在 RocksDB 中完成,并且不必对内存施加压力。

请注意,这种状态缓冲完全发生在您的应用程序中 - Flink 没有灵活的网络缓冲区,这些缓冲区在背压期间会膨胀。

另一点是快照永远不会存储在本地文件系统或 RocksDB 中。如果您选择使用 RocksDB 状态后端,则每个任务管理器的活动工作状态将存储在本地 RocksDB 实例中,但状态备份(快照(将存储在分布式文件系统中。

至于你这样描述的情况,

1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...

这不会发生。没有什么会安排这两个源以这种方式同步 - 它们将比这张图所暗示的更加独立。因为 Flink 在流水线阶段之间只有少量的、固定数量的网络缓冲,所以执行图中发生的任何背压都会迅速传播回一个或两个源。发生这种情况时,背压源将无法将任何事件推送到管道中,直到背压缓解 - 但与此同时,另一个源可能会继续取得进展。屏障将由两个源几乎同时独立地插入到两个流中,但如果源 2 遇到频繁的背压(例如(,它可能看起来更像这样:

1, BARRIER, A, 2, B, 3, BARRIER, C, 4, D, BARRIER, 5 ...
a, BARRIER, A, BARRIER, b, B, BARRIER, BARRIER, c ...

最新更新