提高groupReduce转换的并行化程度



在我的Flink程序中,我使用flatMap操作转换数据,该操作将多个数据块划分为多个较小的块。这些块有一个"位置"属性,用于描述它们在相应原始块中的位置。现在我使用groupReduce,它需要转换所有共享相同"位置"属性的小块。因此,它应该可以很容易地分布在多个节点上。但是,当我在多个节点上运行程序时,groupReduce将以dop为1执行。

我想这是因为我只有一个DataSet,但在Flink Java API中似乎没有GroupedDataSet。是否还有另一种可能性来提高我的groupReduce转化的dop?

这是我正在使用的代码(忽略"细节"的伪代码):

DataSet<SlicedTile> slicedTiles = tiles.flatMap()
    .groupBy(position)
    .sortGroup(time)
    .getDataSet()
    //Until here the dop is correct
DataSet<SlicedTile> processedSlicedTiles = slicedTiles.reduceGroup;

代码的问题在于getDataSet()调用。它返回分组操作的输入。因此,由slicedTiles表示的数据集既没有分组,也没有对其组进行排序,而是flatMap变换的结果,并且程序中根本不考虑groupBysortGroup调用。

对未分组的数据集应用groupReduce(或reduce)操作始终是非并行操作,因为输入数据集的所有元素都作为单个组进行处理。

从逻辑上讲,三个变换groupBy().sortGroup().reduceGroup()属于一起,并被转换为单个groupReduce算子(如果GroupReduceFunction是可组合的,则可能具有额外的组合器)。

如果您按照以下方式更改您的实现,那么它应该按预期工作。

DataSet<SlicedTile> slicedTiles = tiles.flatMap()
    .groupBy(position)
    .sortGroup(time)
    .reduceGroup(yourFunction);

我将打开一个JIRA问题,将JavaDocs添加到Grouping.getDataSet()方法中,以记录该函数的行为。

相关内容

  • 没有找到相关文章

最新更新