Flink如何清理键控状态



当考虑通过某个东西设置键的行为时,我传统上会想到将所有与键匹配的事件放入同一个桶的类比。正如你所能想象的,当Flink应用程序开始处理大量数据时,你选择键入的内容开始变得重要,因为你想确保你很好地清理状态。这就引出了我的问题,Flink究竟是如何清理这些"水桶"的?如果bucket为空(所有MapStates和ValueStates均为空(,Flink是否关闭密钥空间的该区域并删除bucket?

示例:

传入数据格式:{userId,computerId,amountOfTimeLoggedOn}

密钥:UserId/ComputerId

当前密钥空间:

  • Alice,计算机10:其中有2个事件。两个事件都存储在状态中
  • 鲍勃,电脑11:里面没有事件。状态中没有存储任何内容

Flink最终会把Bob,Computer 11从密钥空间中删除吗?还是因为它曾经发生过一件事而永远存在?

Flink不会为没有任何用户值的状态键存储任何数据,至少在现有的状态后端:堆(内存中(或RocksDB中是这样。

密钥空间在Flink中是虚拟的,Flink不会对哪些具体密钥可能存在做出任何假设。每个密钥或密钥子集不存在任何预先分配的存储桶。只有当用户应用程序为某个密钥写入某个值时,它才会占用存储空间。

一般的想法是,所有具有相同密钥的记录都在同一台机器上处理(有点像你说的在同一个存储桶中(。某个密钥的本地状态也总是保存在同一台机器上(如果存储的话(。不过,这与检查点无关。

例如,如果在某个时间点为[Bob,Computer 11]写入了某个值,然后删除了该值,Flink将使用密钥将其完全删除。

简短回答

它在Flink State的生存时间(TTL(特性和Java垃圾收集器(GC(的帮助下进行清理。TTL功能将删除对状态条目的任何引用,GC将收回分配的内存。

答案很长

您的问题可分为3个子问题:

我会尽量简明扼要。

Flink如何根据Key对数据进行分区

对于键控流上的运算符,Flink在一致哈希算法的帮助下对键上的数据进行分区。它创建max_parallelism数量的bucket。每个操作员实例都被分配了一个或多个这样的bucket。每当数据被发送到下游时,密钥就会被分配到其中一个存储桶,然后被发送到相关的操作员实例由于范围是用数学方法计算的,因此此处不存储密钥。因此,任何时候都不会清除任何区域或删除bucket。您可以创建所需的任何类型的密钥。它不会在键空间或范围方面影响内存。

Flink如何使用密钥存储状态

所有操作员实例都有一个实例级状态存储。该存储定义了该操作员实例的状态上下文,并且它可以存储多个命名状态存储;计数"sum"一些名称";这些命名状态存储器是可以存储基于数据的密钥的值的密钥值存储器。

这些KV存储是在我们使用运算符的open()函数中的状态描述符初始化状态时创建的。即CCD_ 3。

只有当需要在状态中存储某些内容时,这些KV存储才会存储数据。(如HashMap.put(k,v)(因此,除非调用状态更新方法(如updateaddput(,否则不会存储任何键或值

所以,

  • 如果Flink没有看到密钥,则不会为该密钥存储任何内容
  • 如果Flink看到了密钥,但没有调用状态更新方法,则不会为该密钥存储任何内容
  • 如果为密钥调用状态更新方法,则密钥-值对将存储在KV存储中

Flink如何清理密钥的状态

Flink不会删除状态,除非用户需要或用户手动完成。如前所述,Flink具有用于状态的TTL功能。此TTL将标记状态过期,并在调用清理策略时将其删除。这些清理策略因wrt后端类型和清理时间而异。对于堆状态后端,它将从状态表中删除条目,即删除对该条目的任何引用。这个未引用的条目占用的内存将由Java GC清理。对于RocksDB State Backend,它只需调用RocksDB的本地delete方法。

相关内容

  • 没有找到相关文章