我想知道Flink创建的存储点(以及HashMapStateBackend的检查点(中的空数据文件是否是意料之中的事情,或者它们是否指向Flink本身、我们正在运行的作业或我们用于检查点和存储点的存储系统的问题。
以下是我为什么要问这个问题的一些上下文,以及在什么情况下可以观察到这样的空文件:
我们在生产中运行的Flink设置出现问题。在看似随机(但目前并不罕见(的情况下,flink无法从检查点或保存点进行恢复。
记录的错误消息可以归结为以下内容:
Caused by: org.apache.flink.core.io.VersionMismatchException: Incompatible version: found 0, compatible versions are [6, 5, 4, 3, 2, 1]
at org.apache.flink.core.io.VersionedIOReadableWritable.resolveVersionRead(VersionedIOReadableWritable.java:87)
at org.apache.flink.core.io.VersionedIOReadableWritable.read(VersionedIOReadableWritable.java:47)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:139)
奇怪的是这个
发生在我们对作业代码没有任何更改的时间点(因此,没有关于需要保存在检查点或保存点中(和
";找到版本";所报告的是0。
更令人困惑的是,这种情况有时会发生,但并非总是如此。也就是说,在10个保存点中,6个是好的,4个是坏的(不可恢复(。
我试图将其与磁盘上的保存点相关联,我认为我看到了以下关联:
我们的坏保存点有一个或多个长度为零的数据文件(同一保存点中的其他数据文件的长度不为零(
我找不到太多关于保存点的二进制格式的信息。所以我的问题是:
- 保存点中的零长度数据文件是意料之中的事情还是出现问题的迹象
- 零长度文件是否会导致上述异常(即检测到版本0(
- 在哪里可以找到有关格式的信息(Flink源代码之外(?具体来说:版本是什么?它们是如何记录在数据中的?如何将
_metadata
文件转换为人类可读的形式
我几乎可以肯定,一个空的数据文件表明这里有问题:Flink将小于state.storage.fs.memory-threshold
的状态写入_metadata
。因此,如果有一个数据文件,它应该大于阈值。但很难说问题出在哪里。你检查过你的存储系统吗:那里有足够的空间吗?