我想使用保存点机制将现有作业从Flink的一个版本移动到另一个版本,方法是:
- 使用保存点停止作业
- 在新版本上从保存点创建新作业
直到Flink 1.14我没有问题,但在Flink 1.15.1中,它失败了。即使不更改版本并停留在1.15.1中,它也会失败。我得到了这个错误,这意味着它无法将状态从上一个作业映射到新作业,因为有一个操作员:
Failed to rollback to checkpoint/savepoint hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c to the new program, because the operator is not available in the new program.
经过研究,有问题的运算符对应于我没有显式创建的ChangelogNormalize
运算符。它是因为我使用tableEnv.fromChangelogStream(stream, schema, ChangelogMode.upsert())
而生成的(upstart模式很重要,其他模式不会失败(。创建的表使用SQL API传递给SQL查询,该查询生成以下内容:
ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam -> [my_sql_transformation] -> [my_sink]
在以前版本的Flink中,这个操作符总是被赋予相同的uid,因此从保存点开始时状态可以匹配。在Flink 1.15.1中,每次都会生成不同的uid。我找不到手动设置uid的可靠方法。我找到的唯一方法是从转型中倒退:
dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");
但我希望有更好的方法来做到这一点。
你知道我做错了什么吗?会不会是Flink中的一个bug?
在JIRA上打开一个问题后,它似乎确实是一个bug。
目前的一个解决方法是将table.exec.legacy-transformation-uids
设置为true
。