我的用例
- 输入是由 ID 键控的原始事件
- 我想计算每个 ID 在过去 7 天内的事件总数。
- 输出将每 10 分钟提前一次
- 从逻辑上讲,这将由大小为 7 天的滑动窗口处理,并提前 10 分钟
这篇文章通过 1 天的翻滚窗口列出了一个很好的优化解决方案
所以我的逻辑是这样的
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val oneDayCounts = joins
.keyBy(keyFunction)
.map(t => (t.key, 1L, t.timestampMs))
.keyBy(0)
.timeWindow(Time.days(1))
val sevenDayCounts = oneDayCounts
.keyBy(0)
.timeWindow(Time.days(7), Time.minutes(10))
.sum(1)
// single reducer
sevenDayCounts
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(10)))
附言忘记单个减速器的性能问题。
问题
但是,如果我理解正确,这意味着由于滑动窗口的性质,单个事件将产生 7*24*6=1008 条记录。所以我的问题是我怎样才能减少绝对的数量?
有一个JIRA票证 - FLINK-11276 - 和一个关于更有效地做到这一点的谷歌文档。
我还建议您查看本文,并讨论使用流切片的高效窗口聚合。