我使用BroadcastState
在Flink中执行流计算。我为我的工作定义了一个扩展KeyedBroadcastProcessFunction
的类。假设我有一个由(user_id, location)
键控的流a,还有一个流B,它被广播给所有执行器,以使用我定义的类处理a中的元素。我知道我可以在这个类的processBroadcastElement
或processElement
中注册一个计时器,这样当计时器超时时,我可以通过调用state.clear()
来删除特定密钥组的关联状态。不知道在那之后,这个关键群体还存在吗?
例如,在流A中,一个新消息带有(user_id=1, location='usa')
,我们生成了这样的密钥组及其相关状态。之后,如果另一条带有(user_id=1, location='usa')
的消息到来,它将触发processElement()
并发出结果。
假设24小时后,我不再对这个密钥组(user_id=1, location='usa')
感兴趣,我可以注册一个计时器来清除相关状态,但我无法控制这个密钥组。因此,24小时后,当另一条具有(user_id=1, location='usa')
的消息到来时,由于该密钥组仍然存在,因此processElement()
仍将被调用。随着作业的运行,尽管它们的关联状态将在24小时后清除,但密钥组会累积吗?或者这不应该是内存使用的问题?
相关博客:https://www.da-platform.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
Flink的键控状态被组织为分布式(或分片)键值存储,其中键可以是简单的东西,如整数和字符串,也可以是复合物,如(user_id=1,location='usa')键组与组合键有所不同。密钥组是Flink 1.2(参见Flink-3755)中引入的一种运行时结构,以允许对密钥值状态进行有效的重新缩放。密钥组是密钥空间的一个子集,并作为一个独立的单元进行检查点检查。在运行时,同一密钥组中的所有密钥都在作业图中一起分区——每个子任务都有一个或多个完整密钥组的键值状态。此设计文档提供了更多详细信息。作为使用DataStream API的用户,密钥组是实现细节,而不是您直接使用的内容。
对于KeyedBroadcastProcessFunction
中的定时器,它们可以在processElement
或onTimer
方法中注册,但不能在processBroadcastElement
方法中注册。这是因为定时器总是与密钥相关联,并且没有与广播元素相关联的密钥。但是,在processBroadcastElement
方法期间,可以通过对KeyedBroadcastProcessFunction.Context
对象使用applyToKeyedState
方法来操作任何或所有关键帧状态。有关更多详细信息,请参阅文档。
一旦调用state.clear(),该键的状态条目就会被删除。当然,该键的新流事件可能在状态被清除后到达,如果您愿意,您可以再次存储该键的值状态。为了避免由于保持不再相关的键的状态而导致无限制的内存使用,您确实需要小心。您可能希望这样的逻辑在每次创建状态24小时后使其过期:
processElement:
if state.value() is null, register timer
state.update(...)
onTimer:
state.clear()
或者,您可能需要更复杂的逻辑,以便在更新或访问状态时延长状态的生存期。
另一种选择是使用状态生存时间功能。
更新:
无论何时在任何ProcessFunction类型的processElement
或onTimer
方法中,上下文中都隐含着一个特定的键,对键控状态(如.update()
或.clear()
)所做的任何操作都只会影响该键的状态。
广播状态的工作方式不同。广播状态始终为MapState,并复制到所有并行子任务中。广播状态是无键的——如果在processElement
方法期间读取广播状态,则无论调用期间上下文中的键是什么,都会看到广播状态的相同值。
只有在KeyedBroadcastProcessFunction
的processBroadcastElement
方法中,才能修改(或清除)广播状态,重要的是,无论发生什么修改(或删除),都要在所有并行实例中以相同的方式进行。这样设计是为了保证每个并行实例在广播状态下都有相同的内容。忽略此规则将导致状态不一致,这可能非常难以调试。有关详细信息,请参阅文档。
因此,是的,如果您对广播状态调用.clear(),那么所有键的所有广播状态都将被删除。或者,您可以从广播状态中删除特定项(记住,广播状态是MapState),在这种情况下,将为所有键删除该特定项。
Flink培训网站上有几个使用广播状态的例子。参见
- https://training.da-platform.com/exercises/ongoingRides.html
- https://training.da-platform.com/exercises/nearestTaxi.html
- https://training.da-platform.com/exercises/taxiQuery.html