我有一个程序进行两阶段聚合,以解决我工作中的数据偏斜问题。我使用了一个简单的ThreadLocalRandom
为我的原始文件生成了一个后缀,比如:
private class KeyByTileWithSalt implements KeySelector<Type, String> {
@Override
public Long getKey(Type value) {
return value.toString() + ThreadLocalRandom.current().nextLong(1, 8);
}
}
但是Flink在为键添加salt时抛出NullPointerException,我正在对某个字段进行窗口聚合。
我在flink邮件列表上发现了类似的帖子,并得到了可能发生异常的原因,但我仍然无法在我的程序中找到关于unstable of hash value
的错误。有什么想法吗?
Flink依赖于keyBy
在集群中具有确定性的结果。这是必要的,这样集群中的每个节点都有一个关于哪个节点负责处理每个密钥的一致视图。通过让密钥依赖于ThreadLocalRandom
,您违反了这一假设。
相反,您可以在摄取过程中为每个用随机值填充的记录添加一个字段,然后将该字段用作密钥。