我正在尝试在spark中实现二次排序。确切地说,对于用户会话的所有事件,我希望根据时间戳对它们进行排序。在二次排序之后,我需要遍历会话的每个事件来实现业务逻辑。我是这样做的:
def createCombiner = (row: Row) => Array(row)
def mergeValue = (rows: Array[Row], row: Row) => {
rows :+ row
}
def mergeCombiner = (rows1: Array[Row], rows2: Array[Row]) => rows1 ++ rows2
def attribute(eventsList: List[Row]): List[Row] = {
for (row: Row <- eventsList) {
// some logic
}
}
var groupedAndSortedRows = rawData.rdd.map(row => {
(row.getAs[String]("session_id"), row)
}).combineByKey(createCombiner, mergeValue, mergeCombiner)
.mapValues(_.toList.sortBy(_.getAs[String]("client_ts")))
.mapValues(attribute)
但我担心这不是最省时的方法,因为转换到RDD需要反序列化和序列化,我认为在处理数据帧/数据集时不需要这样做。
我不确定是否有一个聚合器函数可以返回整行
rawData.groupBy("session_id").someAggregateFunction()
我希望someAggregateFunction()
返回Rows
的列表。我不想在某些列上进行聚合,而是想要与session_id
相对应的整个Rows
的列表。有可能做到这一点吗?
答案是肯定的,但可能不是您所期望的。根据您的业务逻辑有多复杂,除了combineByKey之外,还有两种变体
-
如果您只需要在[spark.sql.functions][1]中定义的mean、min、max和其他已知函数
[1] :https://github.com/apache/spark/blob/v2.0.2/sql/core/src/main/scala/org/apache/spark/sql/functions.scala你当然可以用groupBy(…).agg(…)。我想你不是这样的。因此,如果你想实现自己的UDAF,那并不比combineByKey好,除非这种业务逻辑很常见,并且可以重新用于其他数据集
-
或者您需要稍微复杂的逻辑,可以使用窗口函数要使用window.partitionBy($"session_id").orderBy($"client_ts"desc)指定窗口规范,则可以轻松实现topN、移动平均值、ntile等。请参阅https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html你也可以自己实现自定义窗口agtegration功能