我需要使用处理器API实现类似于会话窗口的逻辑,以便对状态存储进行完全控制。由于处理器API不提供窗口抽象,因此需要手动完成。然而,我找不到KStreams会话窗口逻辑的源代码,无法获得一些初步想法(特别是关于会话超时(。
我本来希望使用punctuate
方法,但它是一个每个处理器的计时器,而不是每个键的计时器。此外,SessionStore<K, AGG>
没有提供API来遍历所有键的数据库。
[更新]
例如,假设处理器实例正在处理K1,并且流时间增加,这导致K2的会话超时。K2可能存在,也可能根本不存在。你怎么知道存在一个特定的密钥(比如K2,当流时间增加时(处理不同的密钥时(?换句话说,当流时间增加时,你如何计算哪些窗口过期了(因为你不知道这些密钥存在(?
这是DSL代码:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java——希望能有所帮助。
不过,目前还不清楚你的问题是什么——主要是陈述。因此,让我试着给出一些一般性的答案。
在DSL中,会话是根据"流时间"进度关闭的。仅依靠输入数据使操作具有确定性。使用挂钟时间将引入非确定性。因此,在DSL实现中不需要使用Punctuation
。
此外,
SessionStore<K, AGG>
不提供API来遍历所有键的数据库。
DSL中的会话基于密钥,因此在一定时间范围内以每个密钥为基础扫描存储就足够了(通过findSessions(...)
完成(。
更新:
在DSL中,每次更新会话窗口时,都会立即向下游发送相应的更新事件。因此,DSL实现不会等待"流时间"进一步推进,而是立即发布当前(可能是中间(结果。
为了遵守宽限期,将记录时间戳与"流时间"进行比较,如果相应的会话窗口已经关闭,则跳过该记录(参见。https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L146)。也就是说,关闭一个窗口只是一个合乎逻辑的步骤(而不是实际操作(;会话仍将被存储,并且如果窗口被关闭,则不需要向下游发送额外的事件,因为最终结果已经在对窗口的最后更新中向下游发送。
保留时间本身不能由Processor
实现来处理,因为它是SessionStore
的内置功能:在内部,会话存储维护所谓的"段",这些"段"存储特定时间段的会话。每次执行put()
时,存储都会检查是否可以丢弃旧段(基于put()
提供的时间戳(。也就是说,旧会话被延迟删除,并作为批量删除(即,整个段的所有会话都将同时删除(,因为它比单独删除更有效。