我有一种情况,使用State
和TimeService
对大规模消息进行滑动计数。滑动尺寸为 1,窗口尺寸大于 10 小时。我遇到的问题是检查点需要很多时间。为了提高性能,我们使用增量检查点。但是当系统执行检查点时,它仍然很慢。我们发现大部分时间用于序列化用于清理数据的计时器。我们为每个键都有一个计时器,总共有大约 300M 计时器。
任何解决此问题的建议将不胜感激。 或者我们可以用另一种方式进行计数? ———————————————————————————————————————————— 我想补充一些细节。滑动大小是一个事件,窗口大小超过 10 小时(每秒大约有 300 个事件),我们需要对每个事件做出反应。所以在这种情况下,我们没有使用 Flink 提供的窗口。我们使用keyed state
来存储之前的信息。timers
用于ProcessFunction
触发旧数据的清理作业。最后,本能键的数量非常多。
我认为这应该有效:
通过有效地执行类似keyBy(key mod 100000)之类的操作,将Flink正在使用的键数量从300M大幅减少到100K(例如)。然后,您的ProcessFunction可以使用MapState(其中键是原始键)来存储它需要的任何内容。
MapState 具有迭代器,您可以使用这些迭代器定期爬网每个映射以使旧项目过期。坚持每个键只有一个计时器的原则(如果你愿意,每个超级键),这样你只有 100K 个计时器。
更新:
Flink 1.6 包含 FLINK-9485,它允许异步检查点计时器,并存储在 RocksDB 中。这使得 Flink 应用程序拥有大量计时器更加实用。
如果不是使用计时器,而是向流的每个元素添加一个额外的字段来存储当前处理时间或到达时间,该怎么办?因此,一旦您想从流中清除旧数据,您只需要使用过滤器运算符并检查是否要删除旧数据。
与其在每个事件上注册一个清除计时器,不如在每个时间段只注册一次计时器,例如每 1 分钟注册一次?您只能在第一次看到密钥时注册它,并在onTimer
中刷新它。喜欢:
new ProcessFunction<SongEvent, Object>() {
...
@Override
public void processElement(
SongEvent songEvent,
Context context,
Collector<Object> collector) throws Exception {
Boolean isTimerRegistered = state.value();
if (isTimerRegistered != null && !isTimerRegistered) {
context.timerService().registerProcessingTimeTimer(time);
state.update(true);
}
// Standard processing
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out)
throws Exception {
pruneElements(timestamp);
if (!elements.isEmpty()) {
ctx.timerService().registerProcessingTimeTimer(time);
} else {
state.clear();
}
}
}
Flink SQLOver
子句也实现了类似的东西。你可以看看这里