ApacheFlink:接收器是否将检查点期间从流缓冲的项目存储到检查点状态



我正在开发一个遗留的Flink管道,我们想更改我们正在使用的接收器的实现。我们正在运行Flink 1.10,并试图从BucketingSink过渡到StreamingFileSink,两者都在将ORC写入同一目的地。我们的管道非常简单:我们将一些kaffa流合并到我们的单个水槽中(没有其他运营商(。

在部署测试过程中,我注意到,当我们从sink1切换到sink2(反之亦然(时,我们在编写的文件中会丢失kafka消息(通过hive/trino查询(。丢失消息的kafka时间戳与我的部署一致,所以我相信这不是一些无关的上游问题。

我目前的理论是,在检查点期间,接收器在等待所有检查点屏障时缓冲来自流的消息,并且这些缓冲的事件被捕获为该接收器的检查点状态的一部分,kafka源认为这些偏移已经被传递/处理(即使它们没有被写入文件,但只存在于接收器的缓冲区中(。因此,当我使用不同的接收器进行部署,并从使用旧接收器创建的检查点开始时,那些缓冲的消息就会丢失。我正在寻找确认,这些接收器是否将缓冲的事件写入检查点状态,并将导致kafka源将其视为"已处理",即使它们尚未写入文件。

我们的时间线看起来像:

┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐  ┌────────┐  ┌──────┐
│Sink1 │  │Sink1 │  │Sink1 │  │Deploy│  │ Resume │  │Sink2 │
│ CP1  │─▶│ CP2  │─▶│ CP3  │─▶│Sink2 │─▶│from CP3│─▶│ CP4  │
└──────┘  └──────┘  └──────┘  └──────┘  └────────┘  └──────┘

我们最终在写入ORC文件的kafka消息中出现了一个缺口;Sink1 CP3";以及";Sink2 CP4";。因此,我相信我们的卡夫卡来源中的卡夫卡偏移量正在增加(尽管我们的来源没有任何变化(,所以卡夫卡来源认为我们已经处理了这些缓冲消息,在我们从CP3恢复后不会将它们发送到Sink2。更奇怪的是:如果我回到Sink1并从CP4+恢复,那么CP3和CP4之间丢失的事件就会被写入!并且没有写入重复的事件,因此不会将kafka源倒带到旧的偏移量,也不会在CP3之后重新处理所有消息。

那么,我是否走在了正确的轨道上,而卡夫卡消息源已经为缓冲消息提前了偏移量?有没有一种方法可以安全地从一个接收器转换到另一个接收器,而不会丢失这些kafka消息的窄片段?

是的,在检查点flink使用状态存储后端期间,检查保存点功能以避免丢失数据https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/

什么是保存点?保存点与检查点有何不同#保存点是通过Flink的检查点机制创建的流作业执行状态的一致映像。您可以使用保存点来停止和恢复、分叉或更新Flink作业。保存点由两部分组成:一个目录,其中包含稳定存储中的(通常较大的(二进制文件(例如HDFS、S3…(和一个(相对较小的(元数据文件。稳定存储器上的文件表示作业执行状态映像的网络数据。保存点的元数据文件(主要(以相对路径的形式包含指向稳定存储中属于保存点的所有文件的指针。

最新更新