Equivalent of DataSet groupBy/withPartitioner for DataStream



以前使用DataSet我可以做一个.groupBy(...),然后.withPartitioner(...)来创建组,以便将一个组(已知比其他所有组大得多)分配给它自己的插槽,其他组将分布在剩余的插槽中。

在切换到DataStream时,我没有看到任何直接的方法可以做同样的事情。如果我深入研究.keyBy(...),我看到它使用带有KeyGroupStreamPartitionerPartitionTransformation,这是有希望的 - 但PartitionTransformation是一个仅限内部使用的类(或者注释是这样说的)。

对于实现相同结果的DataStream,推荐的方法是什么?

有了DataStream,就没有那么简单了。你可以实现一个与partitionCustom一起使用的自定义Partitioner,但是你没有KeyedStream,所以不能使用键控状态或计时器。

另一种解决方案是执行两步本地/全局聚合,例如

.keyBy(randomizedKey).process(local).keyBy(key).process(global)

在某些情况下,第一级随机键控是不必要的(如果键已经在源分区之间很好地分布)。

原则上,给定热键的先验知识,您应该能够以某种方式实现一个KeySelector,该可以很好地平衡任务槽之间的负载。我相信一两个人实际上已经这样做了(通过蛮力搜索从原始密钥到实际密钥的合适映射),但我手头没有参考实现。

正如David所指出的,您有时可以执行double-keyBy技巧(最初使用随机键)来减少键倾斜的影响。就我而言,这是不可行的,因为我使用具有大量内存要求的大型深度学习网络处理每个组中的记录,这意味着在第一次分组中同时加载所有模型。

我重用了我在旧版本的 Flink 上使用的技术,在这种技术中,您决定哪个子任务(运算符索引)应该获取每条记录,然后计算 Flink 将分配给目标子任务的键。计算整数键的代码如下所示:

public static Integer makeKeyForOperatorIndex(int maxParallelism, int parallelism,
int operatorIndex) {
for (int i = 0; i < maxParallelism * 2; i++) {
Integer key = new Integer(i);
int index = KeyGroupRangeAssignment.assignKeyToParallelOperator(
i, maxParallelism, parallelism);

if (index == operatorIndex) {
return key;
}
}
throw new RuntimeException(String.format(
"Unable to find key for target operator index %d (max parallelism = %d, parallelism = %d",
operatorIndex, maxParallelism, parallelism));
}

但请注意,这是非常脆弱的,因为它取决于内部 Flink 实现细节。

相关内容

  • 没有找到相关文章

最新更新