我在使用MemoryStateBackend的独立集群上有一个flink流应用程序。Kryo的TaggedFieldSerializer被用作默认序列化器。
当我更改状态的架构并重新部署应用程序时,我收到以下异常
Caused by: org.apache.flink.util.StateMigrationException: State migration isn't supported, yet.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:209)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:142)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createValueState(HeapKeyedStateBackend.java:234)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:315)
at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:312)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:392)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
如果有人向我建议解决方法,或者我应该使用 FsStateBackend 来解决这个问题,这将非常有帮助。
附言如果我想在 S3 上使用 FsStateBackend 来处理在独立集群上运行的 flink 应用程序,则必须进行哪些配置更改。
FsStateBackend
不会解决此问题,因为它也在引擎盖下使用HeapKeyedStateBackend
,这就是引发此异常的原因。
有FLIP-22可以帮助解决国家移民问题,但它尚未实施。
目前我听说过的最佳选择是使用基于 Avro 的序列化程序,因为它可以实现无缝处理新旧架构。但这不是胆小的人的事情。
关于 FsStateBackend 配置,请参阅此处的(写得很好(文档。