我使用Spark 3.0.1和Scala
我有一个庞大的数据集,其中有类似的(条目)/行,我想
- 按分区中的键对行进行分组。
- 从每个分区内的每个组中选择一个行子集
- 获取分区中每个组的列值。
很容易使用API函数来执行此操作(我认为与标准SDK函数类似)
val df2 = df1
.groupBy("group_key", df1.columns: _*)
.min("length")
df2
.show()
但是我不知道执行用户定义函数的语法
我尝试了以下版本:
def similar(l1: List[Row], l2: List[Row]): List[Row] = {
//logic to choose non-similar rows and limit list size to avoid
// imbalanced partitions
l1 ++ l2
}
val aggFuntion = new Aggregator[Row, List[Row], List[Row]](
createCombiner = List[Row],
mergeValue = similar,
mergeCombiners = similar
)
val aggUdaf = udaf[Row, List[Row], List[Row]](aggFuntion)
val df2 = df1
.groupBy("group_key", df1.columns: _*)
.agg(aggUdaf(col("fathers")))
df2
.show()
但是无法使用语法
我试图按照问题
- 按键分组
- 对每个组执行用户定义的操作(部分减少行)
- 排序结果
- 按排序标准限制每组的结果
不是在分组的DataFrame上实现聚合器,而是转换为RDD:
def partitionOperation = (acc: Array[Row], m: (Int, Iterable[Row])) => {
val lowestNResultsPerGroupByCriterion: Array[Row] = m._2.foldLeft(Array[Row]())((acc, r) => {
val criterion = r.getAs[Double](scoreCriterion)
val acc1 = acc.length match {
case a if a < resultsLimit => (acc :+ r).sortBy(r => r.getAs[Double](scoreCriterion))
case a =>
val i = 0
var acc2 = acc
acc.takeWhile(r1 => {
val criterion1 = r1.getAs[Double](scoreCriterion)
if (criterion < criterion1) {
acc2 = acc.slice(0, i) ++ Array(r) ++ acc.slice(i, a)
}
criterion < criterion1
})
if (acc2.length > resultsLimit) {
acc2.slice(0, resultsLimit)
}
else {
acc2
}
}
acc1
})
lowestNResultsPerGroupByCriterion
}
def crossPartitionOperation = (acc1: Array[Row], acc2: Array[Row]) => acc1 ++ acc2
val h = df
.rdd
.groupBy(r => r.getAs[Int](groupCriterion))
.aggregate(Array[Row]())(partitionOperation, crossPartitionOperation)
h.foreach(r => {
println(r)
})
val dff = sparkSession.sqlContext.createDataFrame(
sparkSession.sparkContext.parallelize(h), df.schema
)
val f = dff
// .orderBy(groupCriterion)
.orderBy(scoreCriterion)
// .filter(r => r.getAs[Int](groupCriterion) == 5)
f.show(120)
println(s"Top $resultsLimit $scoreCriterion of each $groupCriterion.nTotal results: ${f.count()}")