在Spark (Scala)如何在洗牌之前对分组行执行用户定义的操作?



我使用Spark 3.0.1和Scala

我有一个庞大的数据集,其中有类似的(条目)/行,我想

  1. 按分区中的键对行进行分组。
  2. 从每个分区内的每个组中选择一个行子集
  3. 获取分区中每个组的列值。

很容易使用API函数来执行此操作(我认为与标准SDK函数类似)

val df2 = df1
.groupBy("group_key", df1.columns: _*)
.min("length")

df2
.show()

但是我不知道执行用户定义函数的语法

我尝试了以下版本:

def similar(l1: List[Row], l2: List[Row]): List[Row] = {
//logic to choose non-similar rows and limit list size to avoid 
// imbalanced partitions 
l1 ++ l2
}
val aggFuntion = new Aggregator[Row, List[Row], List[Row]](
createCombiner = List[Row],
mergeValue = similar,
mergeCombiners = similar
)
val aggUdaf = udaf[Row, List[Row], List[Row]](aggFuntion)

val df2 = df1
.groupBy("group_key", df1.columns: _*)
.agg(aggUdaf(col("fathers")))

df2
.show()

但是无法使用语法

我试图按照问题

  1. 按键分组
  2. 对每个组执行用户定义的操作(部分减少行)
  3. 排序结果
  4. 按排序标准限制每组的结果

不是在分组的DataFrame上实现聚合器,而是转换为RDD:

def partitionOperation = (acc: Array[Row], m: (Int, Iterable[Row])) => {
val lowestNResultsPerGroupByCriterion: Array[Row] = m._2.foldLeft(Array[Row]())((acc, r) => {
val criterion = r.getAs[Double](scoreCriterion)
val acc1 = acc.length match {
case a if a < resultsLimit => (acc :+ r).sortBy(r => r.getAs[Double](scoreCriterion))
case a =>
val i = 0
var acc2 = acc
acc.takeWhile(r1 => {
val criterion1 = r1.getAs[Double](scoreCriterion)
if (criterion < criterion1) {
acc2 = acc.slice(0, i) ++ Array(r) ++ acc.slice(i, a)
}
criterion < criterion1
})
if (acc2.length > resultsLimit) {
acc2.slice(0, resultsLimit)
}
else {
acc2
}
}
acc1
})
lowestNResultsPerGroupByCriterion
}
def crossPartitionOperation = (acc1: Array[Row], acc2: Array[Row]) => acc1 ++ acc2
val h = df
.rdd
.groupBy(r => r.getAs[Int](groupCriterion))
.aggregate(Array[Row]())(partitionOperation, crossPartitionOperation)
h.foreach(r => {
println(r)
})
val dff = sparkSession.sqlContext.createDataFrame(
sparkSession.sparkContext.parallelize(h), df.schema
)
val f = dff
//      .orderBy(groupCriterion)
.orderBy(scoreCriterion)
//      .filter(r => r.getAs[Int](groupCriterion) == 5)
f.show(120)
println(s"Top $resultsLimit $scoreCriterion of each $groupCriterion.nTotal results: ${f.count()}")

最新更新