我有一个带有不同键的消息流。对于每个键,我想创建一个事件时间会话窗口,并仅在以下情况下对其进行一些处理:
MIN_EVENTS
窗口中累积的事件数(本质上是键控状态(
对于每个键,MIN_EVENTS
是不同的,并且可能会在运行时更改。我在实施这一点时遇到了困难。特别是,我像这样实现这个逻辑:
inputStream.keyBy(key).
window(EventTimeSessionWindow(INACTIVITY_PERIOD).
trigger(new MyCustomCountTrigger()).
apply(new MyProcessFn())
我正在尝试创建一个自定义MyCustomCountTrigger()
,该应该能够从状态存储(例如将key
映射到其MIN_EVENTS
参数的MapState<String, Integer> stateStore
(中读取。我知道我可以使用所有触发器可用的TriggerContext ctx
对象访问状态存储。
如何从 CountTrigger(( 类外部初始化此状态存储?我找不到这样做的例子。
可以根据发送到 Trigger 类构造函数的参数初始化状态。但是,您无法从该类外部访问状态。
如果您需要更大的灵活性,我建议您使用流程函数而不是窗口。