我有一个流,我想使用某个键对其进行分区,然后运行几个转换,每个转换使用一个状态。当我调用keyBy()
时,我得到一个KeyedStream
,下一个转换可以正确地访问分区状态,但是在那之后链接的另一个转换在试图访问分区状态时得到一个异常。例外是:
未在配置中配置状态键序列化器。此操作不能使用分区状态
似乎关键信息只传递给了第一个转换,而没有传递给链的其他部分。
我尝试运行的代码是沿着这段代码的行(但实际上做了一些事情):
DataStream<Long> newStream = eventsStream
.keyBy("username")
.filter(new RichFilterFunction<Event>() {
private ValueState<Boolean> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE1", Boolean.class, Boolean.TRUE));
}
@Override
public boolean filter(Event value) throws Exception {
return stateStore.value();
}
})
.map(new RichMapFunction<Event, Long>() {
private ValueState<Long> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE2", Long.class, 0L));
}
@Override
public Long map(Event value) throws Exception {
return Long.parseLong(value.data) + stateStore.value();
}
});
此代码将在第二次调用getState()
时抛出异常。
我可以再次调用keyBy()
,但随后我删除了链接操作的能力。我可以手动操作流图的对象以便传递关键信息吗?还是不支持这种类型的链接?
你不能。
即使您第二次调用keyBy()
(或以某种方式向下游传递"key-ed"信息),您也将获得一个新状态,因为一个状态仅与单个操作符相关联。
作为解决方法,您需要将两个操作符合并为一个。
如果您认为此功能可能有帮助,请随时在dev@flink.apache.org
提出建议。