flink 减少状态影响性能



当我们使用ReducingState RecordStore.add(r(时发现,性能是波动的,如图所示,

没有还原状态: 稳定的性能图

使用还原状态: 波动性能图表

整体性能(下降超过100%! 没有 Reducingstate.add VS with ReducingState.add

它可以通过一个简单的应用程序轻松重现,没有检查点,只需简单地存储记录,也使用简单的"总和"缩减功能(实际上使用空函数会看到相同的结果(。任何想法将不胜感激。多么难以置信的明显问题。

基本上,应用程序只是将记录存储到状态中,我们在"JsonTranslator"中测量每秒有多少条记录,如图所示。两者之间的区别只有 1 行,注释/取消注释"recStore.add(r("。

了解状态会影响性能,但这是它的工作方式吗?

DataStream<String> stream = env.addSource(new GeneratorSource(loop);
DataStream<JSONObject> convert = stream.map(new JsonTranslator(statsdUrl))
.keyBy(new KeySelector<JSONObject, AggregationKey>() {... ...})
.process(new ProcessAggregation(aggrDuration, statsdUrl))
.map(new PassthruFunction(statsdUrl));  

public class ProcessAggregation extends ProcessFunction<JSONObject, JSONObject> {
private ReducingState<JSONObject> recStore;
public void processElement(JSONObject r, Context ctx, Collector<JSONObject> out) {
recStore.add(r); //this line make the difference
}

如果任务可以在具有少量线程的单台计算机上轻松完成,那么如果执行托管状态对性能的影响太大,flink对你来说可能是矫枉过正。

也就是说,您不需要以这种方式直接使用ReducingState,通常您会在 Windowed 运算符上使用aggregatereduce函数(另外,您的窗口是什么?目前还不清楚你什么时候输出你的结果。您是否持续发出聚合?

您的源是否生成了进入多个键的数据?

您使用的是默认状态后端还是使用 RocksDB?

此外,你可以考虑使用 Flink 提供的便利sum功能,这将允许你指定要添加哪些字段。

我已经用你分享的代码进行了一些实验。我只在我的笔记本电脑上运行它。我保留了所有 statsd 代码,但我没有运行 statsd。相反,我将web.refresh-interval到 1 秒配置为 Flink Web 仪表板并观察numRecordsOutPerSecond。我唯一改变的是修改GeneratorSource以连续运行,以便我可以观察稳定状态行为。

这是我所看到的:

  1. 除了在作业开始时,我没有看到吞吐量有任何剧烈波动。有一个大约 30 秒的初始周期,在此期间吞吐量稳步上升到一个值,然后保持相当一致(在初始启动阶段之后,无论有没有ReducingState,它上下变化约 10%(。

  2. 将 Flink 版本从 1.3.2 更新到 1.5.0 将整体吞吐量提高了近 2 倍。这并不奇怪,因为自 1.3 以来,在 Flink 的网络堆栈上做了很多工作。

  3. 注释掉mergedRecordStore.add(r);还可以将吞吐量提高约 2 倍。

查看代码,我看到一件事引起了一些痛苦。您正在使用 JSONObjects 进行键控、序列化/反序列化和缩减。这很昂贵。最好将 JSON 转换为 POJO 或元组,这样使用起来会更便宜。

相关内容

  • 没有找到相关文章

最新更新