假设我有一个带有SessionWindowing的Kafka流,例如:
windowedBy(SessionWindows.with(inactivity_time).until(aWeek))
是否可以根据以下记录的到达反复更改窗口的非活动时间间隔?
例如,如果我最初用第一条记录定义了我的inactivity_time=360000
,那么如果在inactivity_time/2
之后没有到达相同密钥的第二条记录,是否可以修改此值?
不幸的是,内置功能无法实现这一点。
这样做需要您自己实现这个自定义功能,例如使用Kafka Streams的处理器API(然后您可以将由此产生的处理器/转换器插入Kafka Stream的DSL(。