我只在一个具有Parallelism = 1
的节点上运行Flink
,以便将其性能与单线程应用程序进行比较。我想知道Flink
是否仍在使用Shuffle
,尽管它不是并行运行的。因此,如果执行以下命令:
var counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
会在groupBy
之前使用Shuffle
吗?有办法检查一下吗?
(在交互式Scala Shell
的输出中,有一个FlatMap
、Map
、Combine
,最后还有一个Reduce
需要观察。使用Parallelism > 1
运行时也是如此。)
ds.groupBy(0).sum(1)
生成独立于实际并行度的作业图... -> Combiner -> Reducer
。在Combiner
和Reducer
之间引入了散列分割器(shuffle步骤)。这对所有parallelism > 1
都有意义。
对于parallelism = 1
,优化器理论上可以删除shuffle步骤,因为这不是必需的。但是,它实际上不应该影响程序的性能。
原因是对于parallelism = 1
,所有工作都将在本地组合器中完成。这意味着组合器计算得到的和,然后只向reducer发送单个元素。此外,由于组合器和减速器在同一台机器上运行,因此不涉及网络通信。数据只是通过移交一个内存段来传输的。由于Flink还支持流式混洗,所以组合器甚至不必在第一个结果发送到reducer之前完成。合并器和还原器可以同时运行,从而避免了中间结果的具体化。