正在更新 keyBy() 中指定的密钥



我有一个在生产环境中运行的 Flink 流作业,我需要对主转换代码进行更改。

生产中的代码实际上如下所示:

stream
.filter(inboundData -> inboundData.hasToBeFiltered())
.uid("filtered-data")
.keyBy(data -> data.getMyStringKey())
.process(doSomething())
.uid("processed-inbound-data-id");

我需要更改keyBy()运算符使用 inboundData POJO 的不同属性对数据进行分区的方式。当前使用的属性是字符串,而新属性是 Long。

因此,新代码将如下所示:

stream
.filter(inboundData -> inboundData.hasToBeFiltered())
.uid("filtered-data")
.keyBy(data -> data.getMyLongKey())
.process(doSomething())
.uid("processed-inbound-data-id");

我执行了上述更改并尝试将作业的更新版本提交到我的 Flink 集群,从取消旧作业之前采取的保存点恢复运算符的状态,但出现以下错误:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for LegacyKeyedProcessOperator_632e4c67d1f4899514828b9c5059a9bb_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 5 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:324)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 7 more
Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:268)
... 11 more

从堆栈跟踪中,我可以推断出错误是由于我正在更改 keyBy(( 运算符中使用的键的类型

。 我试着摆弄一下代码,在谷歌上搜索有关这个主题的问题,但我找不到任何有意义的东西来提示我如何执行我需要的更改。

所以我的问题是:

  • 我尝试执行的更改是否可以在不丢失保存状态的情况下实现?
  • 如果是这样,谁能给我一个关于如何执行这种更改的线索?

非常感谢。

我认为您应该能够使用状态处理器 API(即将作为 Flink 1.9 的一部分发布其第一个版本(来编写一个 DataSet 程序,该程序读取旧版本的保存点并写入与新版本兼容的新保存点。

相关内容

  • 没有找到相关文章

最新更新