我有以下形式的预订元素流:
Booking(id=B1, driverId=D1, time=t1, location=l1)
Booking(id=B2, driverId=D2, time=t2, location=l2)
我需要查找每个位置的过去 15 分钟内的预订计数。但是,对于某个位置的任何新预订,都应评估该窗口。
大致像:
Assuming `time` field is set as timestamp of record
bookingStream.keyBy(b=>b.location).window(Any window of 15mins).trigger(triggerFunction)
除了trigger function should not be evaluated
在 15 分钟结束时,而是whenever any booking arrives at a location
,并且emit the count of booking in last 15min from timestamp of newly arrived booking
。
方法:
使用RichMap功能,将位置预订的优先级队列维护为托管状态(ValueState(,并将时间戳作为预订的优先级。对于到达的每个元素,首先将其添加到状态并从当前到达的元素中删除早于 15 分钟的元素。向收集器发出优先级队列中剩余元素的计数。
这是正确的方法,还是可以通过以更好的方式使用其他一些 flink 结构来实现。
如果你在基于堆的状态后端上运行,你建议的内容应该表现得相当好。但是使用 RocksDB,您将不得不对每次访问的优先级队列进行序列化/反序列化,这可能会相当痛苦。
在 RocksDB 上可能表现更好的一种方法是将当前计数与 ValueState 中的最早时间戳以及 ListState 中的预订集一起保留。RocksDB 状态后端可以附加到 ListState 而无需经过 ser/de,因此您只需在最早的元素太旧时反序列化和重新序列化整个列表。