使用 fromChangelogStream 时"Cannot map checkpoint/savepoint state for operator"



我想使用保存点机制将现有作业从Flink的一个版本移动到另一个版本,方法是:

  1. 使用保存点停止作业
  2. 在新版本上从保存点创建新作业

直到Flink 1.14我没有问题,但在Flink 1.15.1中,它失败了。即使不更改版本并停留在1.15.1中,它也会失败。我得到了这个错误,这意味着它无法将状态从上一个作业映射到新作业,因为有一个操作员:

Failed to rollback to checkpoint/savepoint hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c to the new program, because the operator is not available in the new program.

经过研究,有问题的运算符对应于我没有显式创建的ChangelogNormalize运算符。它是因为我使用tableEnv.fromChangelogStream(stream, schema, ChangelogMode.upsert())而生成的(upstart模式很重要,其他模式不会失败(。创建的表使用SQL API传递给SQL查询,该查询生成以下内容:

ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam -> [my_sql_transformation] -> [my_sink]

在以前版本的Flink中,这个操作符总是被赋予相同的uid,因此从保存点开始时状态可以匹配。在Flink 1.15.1中,每次都会生成不同的uid。我找不到手动设置uid的可靠方法。我找到的唯一方法是从转型中倒退:

dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");

但我希望有更好的方法来做到这一点。

你知道我做错了什么吗?会不会是Flink中的一个bug?

在JIRA上打开一个问题后,它似乎确实是一个bug。

目前的一个解决方法是将table.exec.legacy-transformation-uids设置为true

相关内容

  • 没有找到相关文章

最新更新