我们看到一些奇怪的行为,processWindow函数发出两条记录,第一记录包含使用窗口中存在的聚合数据的完整信息,而第二记录包含从记录中移除一些信息的部分信息。
processWindow函数使用的状态(MapState(如下:
override def open(parameters: Configuration): Unit = {
cfState = getRuntimeContext.getMapState(
new MapStateDescriptor[(String, Int), mutable.Map[Int, mutable.Set[Int]]] (
"customFieldsState",
classOf[(String, Int)],
classOf[mutable.Map[Int, mutable.Set[Int]]]
)
)
}
并且CCD_ 1函数使用窗口中存在的记录来操纵上述状态。
这是反模式吗?在processWindow
函数中使用状态?对于在processWindow
函数中使用state,是否有其他建议?
在这种情况下,我们需要维护状态,因为我们不会在单个窗口中捕获所有字段,并且我们需要聚合每个用户的记录,因此使用窗口函数。
感谢
如果您想在单个窗口实例的生存期之外保持状态,您应该使用
KeyedStateStore ProcessWindowFunction.Context#globalState
当窗口关闭时,所有其他状态都将被清除。
由于Flink从不清除globalState
,因此如果密钥过期,则应在使用的状态描述符上设置状态TTL,以避免随着时间的推移而泄露状态。