使用scala的Spark数据帧的RDD aggregateByKey等价物是什么



我写了下面的代码来使用分组计算数据帧的相关性,但我最终不得不使用RDD AggregateByKey、Sequential Operation和Combinater操作来实现我所需要的。然而,我想只使用spark数据帧来实现同样的功能,并完全避免RDD。我试着学习Spark数据帧;agg";以及";分组依据";函数,但不完全确定如何实现与使用RDD相同的结果。非常感谢这里的任何帮助?

val columnIndexes = columns.indices.map(i => i + groupIndexes.length).toArray
//removing rows with nulls in group by columns like the MR version
val cleanDF = selectedDF.na.drop("any", groupByColumns)
val allCountersPerGroupRDD: RDD[(immutable.IndexedSeq[Any], Seq[Seq[CovCounter]])] = cleanDF.rdd.map(row =>
//create key value pairs
(groupIndexes.map(ind => row.get(ind)), columnIndexes.map(i => toDouble(row.get(i)))))
.aggregateByKey(zeroCounters, numPartitions)(
seqOp = (counters, newValues) => {
for ((i, j) <- columnHalfPairedIndicesFlattened) {
counters(i)(j).addIfNotNaN(newValues(i), newValues(j))
}
counters
}, combOp = (baseCounters, otherCounters) => {
for ((i, j) <- columnHalfPairedIndicesFlattened) {
baseCounters(i)(j).merge(otherCounters(i)(j))
}
baseCounters
})

val finalRDD: RDD[Row] = allCountersPerGroupRDD.mapPartitions { iterator =>
iterator.flatMap { case (groupKeys, counts) =>
columns.indices.map(ind =>
Row.fromSeq(groupKeys ++ Seq(columns(ind)) ++ columnPairedIndicesAll(ind).map { case (i, j) =>
getCovOrCorrFromCounters(i, j, counts, useCorrelation)
}))
}
}
val outDF = sparkSession.createDataFrame(finalRDD, outputSchema)

请参阅https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html.你需要制作自己的UDAF。

最新更新