我正在根据UDAF示例实现UDAF。 那里的update
阶段如下所示:
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
String inputKey = input.getString(0);
Map<String, Long> inputValues = input.<String, Long>getJavaMap(1);
Map<String, Map<String, Long>> newData = new HashMap<>();
if (!buffer.isNullAt(0)) {
Map<String, Map<String, Long>> currData = buffer.<String, Map<String, Long>>getJavaMap(0);
newData.putAll(currData);
}
newData.put(inputKey, inputValues);
buffer.update(0, newData);
}
}
您可以看到,在每一步中都会创建一个新的 HashMap(newData),并且来自上一个缓冲区的数据被复制到其中。这看起来像是一种可怕的浪费,必须创建新的地图并复制所有元素。所以我尝试了(就我而言,我有一张类型略有不同的地图):
bufferJavaMap = buffer.<String, Integer>getJavaMap(0);
bufferJavaMap.put("aaaa", 1);
buffer.update(0, bufferJavaMap);
我收到以下错误:
java.lang.UnsupportedOperationException
at java.util.AbstractMap.put(AbstractMap.java:209)
at dns.MergeMapUDAF.update(MergeMapUDAF.java:84)
是不是可以更新现有的地图? 更新此地图的最佳方法是什么?
无法
更新现有地图吗?
这是不可能的,但问题比您的问题更复杂。Spark 在get
和update
上都制作了结构的完整副本,因此即使删除显式副本也无法解决问题。
如果需要性能,应避免将UserDefinedAggregateFunction
与非原子类型一起使用。