在我的Flink程序中,我使用flatMap
操作转换数据,该操作将多个数据块划分为多个较小的块。这些块有一个"位置"属性,用于描述它们在相应原始块中的位置。现在我使用groupReduce
,它需要转换所有共享相同"位置"属性的小块。因此,它应该可以很容易地分布在多个节点上。但是,当我在多个节点上运行程序时,groupReduce
将以dop为1执行。
我想这是因为我只有一个DataSet
,但在Flink Java API中似乎没有GroupedDataSet
。是否还有另一种可能性来提高我的groupReduce
转化的dop?
这是我正在使用的代码(忽略"细节"的伪代码):
DataSet<SlicedTile> slicedTiles = tiles.flatMap()
.groupBy(position)
.sortGroup(time)
.getDataSet()
//Until here the dop is correct
DataSet<SlicedTile> processedSlicedTiles = slicedTiles.reduceGroup;
代码的问题在于getDataSet()
调用。它返回分组操作的输入。因此,由slicedTiles
表示的数据集既没有分组,也没有对其组进行排序,而是flatMap
变换的结果,并且程序中根本不考虑groupBy
和sortGroup
调用。
对未分组的数据集应用groupReduce
(或reduce
)操作始终是非并行操作,因为输入数据集的所有元素都作为单个组进行处理。
从逻辑上讲,三个变换groupBy().sortGroup().reduceGroup()
属于一起,并被转换为单个groupReduce
算子(如果GroupReduceFunction
是可组合的,则可能具有额外的组合器)。
如果您按照以下方式更改您的实现,那么它应该按预期工作。
DataSet<SlicedTile> slicedTiles = tiles.flatMap()
.groupBy(position)
.sortGroup(time)
.reduceGroup(yourFunction);
我将打开一个JIRA问题,将JavaDocs添加到Grouping.getDataSet()
方法中,以记录该函数的行为。