在 UDAF 的每个更新步骤上创建新的累加器



我正在根据UDAF示例实现UDAF。 那里的update阶段如下所示:

public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
String inputKey = input.getString(0);
Map<String, Long> inputValues = input.<String, Long>getJavaMap(1);
Map<String, Map<String, Long>> newData = new HashMap<>();
if (!buffer.isNullAt(0)) {
Map<String, Map<String, Long>> currData = buffer.<String, Map<String, Long>>getJavaMap(0);
newData.putAll(currData);
}
newData.put(inputKey, inputValues);
buffer.update(0, newData);
}
}

您可以看到,在每一步中都会创建一个新的 HashMap(newData),并且来自上一个缓冲区的数据被复制到其中。这看起来像是一种可怕的浪费,必须创建新的地图并复制所有元素。所以我尝试了(就我而言,我有一张类型略有不同的地图):

bufferJavaMap = buffer.<String, Integer>getJavaMap(0);
bufferJavaMap.put("aaaa", 1);
buffer.update(0, bufferJavaMap);

我收到以下错误:

java.lang.UnsupportedOperationException
at java.util.AbstractMap.put(AbstractMap.java:209)
at dns.MergeMapUDAF.update(MergeMapUDAF.java:84)

是不是可以更新现有的地图? 更新此地图的最佳方法是什么?

无法

更新现有地图吗?

这是不可能的,但问题比您的问题更复杂。Spark 在getupdate上都制作了结构的完整副本,因此即使删除显式副本也无法解决问题。

如果需要性能,应避免将UserDefinedAggregateFunction与非原子类型一起使用。

相关内容

  • 没有找到相关文章

最新更新