为什么我在 Apache Flink 中使用 initializeState() 时得到一个 NullPointerEx



我在CheckpointedFuntion中使用运算符状态,但是我在初始化MapState时遇到了NullPointerException:

public void initializeState(FunctionInitializationContext context) throws Exception {
MapStateDescriptor<Long, Long> descriptor
= new MapStateDescriptor<>(
"state",
TypeInformation.of(new TypeHint<Long>() {}),
TypeInformation.of(new TypeHint<Long>() {})
);
state = context.getKeyedStateStore().getMapState(descriptor);
}

当我将"描述符"分配给getMapState()时,我得到了NullPointerException。

下面是堆栈跟踪:

java.lang.NullPointerException
at fyp.Buffer.initializeState(Iteration.java:51)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

我猜你遇到了一个 NPE,因为你试图访问这里记录的KeyedStateStore; 但是,由于您没有键控流,因此您的作业中没有可用的此类状态存储。

获取系统键/值状态的句柄。仅当函数在 KeyedStream 上执行时,才能访问键/值状态。在每次访问时,状态公开函数当前处理的元素的键的值。每个函数可能有多个分区状态,使用不同的名称进行寻址。

因此,如果您在未键入的上游(您不会这样做)上实现CheckpointedFunction(在此处记录),则应考虑访问运算符状态存储

snapshotMetadata = context.getOperatorStateStore.getUnionListState(descriptor)

运算符状态允许您在作业的每个并行实例中有一个状态,与每个状态实例依赖于键控流生成的键的键的键状态相反。

请注意,在上面的示例中,我们请求.getUnionListState,这将产生运算符状态的所有并行实例(格式化为状态列表)。

如果你寻找一个具体的例子,你可以给这个来源一个机会:它是一个实现算子状态的算子。

最后,如果你需要一个键控流,所以你可能会考虑将你的解决方案移近键控状态 Flink 后端。

相关内容

  • 没有找到相关文章

最新更新