避免分区不平衡 Spark



我正在修订的代码存在性能问题,每次在执行计数时都会给出OOM。我想我发现了问题,基本上是在keyBy变形之后,被处决aggregateByKey.问题在于几乎 98% 的 RDD 元素具有相同的键,因此 aggregationByKey 生成随机,将几乎所有记录放入同一分区,底线:只有少数执行器工作,并且内存压力很大。

这是代码:

val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies
    .keyBy(po => po.getProcessCreator.name)
    .aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
    .map {case(name,list) =>
      val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID))
      val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))}
      lastOfGroupByKeys.flatMap(f => f._2)
    }
    .flatMap(f => f)
log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count)

我将以一种更并行的方式执行操作,允许所有执行程序几乎平等地工作。我该怎么做?

我应该使用自定义分区程序吗?

如果你的观察是正确的,并且

98% 的 RDD 元素具有相同的密钥

然后更改分区程序根本无济于事。根据分区程序的定义,98% 的数据必须由单个执行器处理。

幸运的是,糟糕的代码可能是比歪斜更大的问题。跳过:

.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)

这只是一个民间魔法,看起来整个管道可以重写为一个简单的reuceByKey.伪代码:

  • name和本地密钥合并为一个密钥:

    def key(po: AnomalyPO) = (
      // "major" key
      po.getProcessCreator.name, 
      // "minor" key
      po.getPodId, po.getAnomalyCode,
      po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID
    )
    

    包含名称、日期和其他字段的键应比单独的名称具有高得多的基数。

  • 映射到对并按键减少:

    rddAnomalies
      .map(po => (key(po), po))
      .reduceByKey((x, y) => 
        if(x.getProcessDate.getMillis > y.getProcessDate.getMillis) x else y
      )
    

最新更新