我有歪斜当我keyBy对我的数据。假设关键字是:
case class MyKey(x: X, y:Y)
为了解决这个问题,我正在考虑添加一个额外的字段,通过使用该字段仅用于分区,甚至在工作人员之间进行分发:
case class MyKey(z: evenlyDistributedField, x: X, y:Y) extends MyKey(x, y) {
override def hashCode(): Int = z.hashCode
}
由于这一行,我的记录将使用覆盖的hashCode,并均匀地分布到每个worker,并使用原始的equals方法(只考虑X和Y字段)在以后的有状态操作符中找到适当的关键状态。
我知道相同的(X, Y)对将在不同的worker中结束,但我可以稍后处理。(在用我的新键进行必要的处理以避免歪斜之后)。
我的问题是其他地方是关键的hashCode方法被使用?
当我看到扩展类在hashMap中使用键来获取该键的状态时,我怀疑当获得键状态时(顺便说一下,什么是名称空间?)我知道从映射中检索KeyedState会比较慢,因为hashCode不会考虑X、Y字段。但是在flink代码中还有其他地方使用了键的hashcode方法吗?
还有其他方法可以解决这个问题吗?我想到了物理分区,但是我不能使用keyBy。
综上所述:
- 将我的数据随机分配到每个worker中以产生均匀分布
- [编辑]在每个分区中独立地执行.window().aggregate()(就像其他分区不存在一样)。每个窗口聚合中的数据应该在这个分区的(X,Y)s上进行键化。忽略其他分区中相同的(X,Y)键。
- 合并由于相同的(X,Y)对出现在不同的分区后的冲突(这我不需要指导。我通过on (X, Y)) 创建一个新键
在这种情况下,我通常创建一个瞬态Tuple2<MyKey, Integer>
,在Tuple.f1
字段中填充我想要用来划分的任何内容。.keyBy()
之后的map
或flatMap
操作可以释放MyKey
。这避免了与MyKey.hashCode()
混淆。
请注意,hashCode()
和equals()
方法的字段集不同会导致痛苦和痛苦。Java有一个"等于一致性"的约定:彼此相等的对象必须返回相同的hashcode。
(更新)
如果你不能卸载大量的非关键工作,那么我会做的是…
- 将
Tuple2<MyKey, Integer>
中的Integer
设置为hashCode(MyKey) % <operator parallelism * factor>
。假设您的parallelism * factor
足够高,您只会得到2(或更多)组进入同一子任务的少数情况。 - 在操作符中,使用
MapState<MyKey, value>
存储状态。你需要这个,因为你会得到多个唯一的MyKey
值到相同的关键字组。 - 执行您的处理并从该操作符发出
MyKey
。
通过使用hashCode(MyKey) % some value
,您应该获得每个子任务的唯一MyKey
值的很好的组合,这应该减轻倾斜。当然,如果一个值占主导地位,那么您将需要另一种方法,但由于您没有提到这一点,我假设情况并非如此。