我正在修订的代码存在性能问题,每次在执行计数时都会给出OOM
。我想我发现了问题,基本上是在keyBy
变形之后,被处决aggregateByKey.
问题在于几乎 98% 的 RDD 元素具有相同的键,因此 aggregationByKey 生成随机,将几乎所有记录放入同一分区,底线:只有少数执行器工作,并且内存压力很大。
这是代码:
val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies
.keyBy(po => po.getProcessCreator.name)
.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
.map {case(name,list) =>
val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID))
val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))}
lastOfGroupByKeys.flatMap(f => f._2)
}
.flatMap(f => f)
log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count)
我将以一种更并行的方式执行操作,允许所有执行程序几乎平等地工作。我该怎么做?
我应该使用自定义分区程序吗?
如果你的观察是正确的,并且
98% 的 RDD 元素具有相同的密钥
然后更改分区程序根本无济于事。根据分区程序的定义,98% 的数据必须由单个执行器处理。
幸运的是,糟糕的代码可能是比歪斜更大的问题。跳过:
.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
这只是一个民间魔法,看起来整个管道可以重写为一个简单的reuceByKey
.伪代码:
将
name
和本地密钥合并为一个密钥:def key(po: AnomalyPO) = ( // "major" key po.getProcessCreator.name, // "minor" key po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID )
包含名称、日期和其他字段的键应比单独的名称具有高得多的基数。
映射到对并按键减少:
rddAnomalies .map(po => (key(po), po)) .reduceByKey((x, y) => if(x.getProcessDate.getMillis > y.getProcessDate.getMillis) x else y )