Flink批量加入性能



我一直在用TableApi和DataStream api以批处理模式测试一个简单的联接。然而,我的成绩一直很糟糕,所以一定是我做错了什么。用于连接的数据集为~900gb和3gb。用于测试的环境是具有10*m5.x大型工作节点的EMR。

所使用的TableApi方法是在数据s3路径上创建一个表,并在目标s3路径上对创建的表执行插入语句。通过调整任务管理器内存、任务插槽数量、并行性,但无法使其在可接受的时间内执行(至少1.5小时(。

当在批处理模式下使用DataStreamApi时,我总是遇到这样一个问题,即由于使用了超过90%的磁盘空间,yarn会杀死任务。所以我很困惑这是由于代码的原因,还是只是flink需要比spark多得多的磁盘空间。读取数据流:

val sourceStream: DataStream[SourceObj] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "someSourceName")
.map(x => SourceObj.convertFromString(x))

加入:

val joinedStream = sourceStream.join(sourceStream2)
.where(col1 => sourceStream.col1)
.equalTo(col2 => sourceStream2.col2)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(1))
.apply{
(s, c) => JoinedObj(c.col1, s.col2, s.col3)
}

我是遗漏了什么,还是只需要扩大集群?

通常,您最好使用Flink的Table/SQL API实现关系工作负载,这样它的优化器就有机会提供帮助。

但如果我没看错的话,这个特殊的联接执行起来会非常昂贵,因为状态中没有任何内容过期。这两个表都将在Flink中完全具体化,因为对于这个查询,每一行输入都保持相关,并可能影响结果。

如果您可以将其转换为某种带有时间约束的联接,优化器可以使用该联接来释放不再有用的行,那么它的性能会更好。

在批处理模式下使用DataStream API时,它在所有shuflle/join/reduce操作中广泛使用托管内存。此外,正如最后一段所述,Flink将在连接期间将无法放入内存的所有数据溢出到磁盘。

因此,我认为这可能是磁盘空间短缺问题的原因。我的工作也遇到了同样的问题。

相关内容

  • 没有找到相关文章

最新更新