scala spark提取共享逻辑到一个函数中并用于聚合



我有以下计算聚合的函数:

def compute(spark: SparkSession,
mydf: DataFrame): DataFrame = {

mydf
.groupBy(col("col1"), col("col2")
.agg(
count(when(col("col5") === "some_string_to_check", col("purchase_date"))).as("name1"),
count(when(col("col5") === "some_string_to_check", col("purchase_date"))).as("name2"),
count(when(col("col5") === "some_string_to_check"), col("purchase_date"))).as("name3"),
count(when(col("col5") === "some_string_to_check"), col("purchase_date"))).as("name4"),

count(when(col("col5") === "some_string1", col("purchase_date"))).as("name10"),
count(when(col("col5") === "some_string1", col("purchase_date"))).as("name11"),
count(when(col("col5") === "some_string1"), col("purchase_date"))).as("name12"),
count(when(col("col5") === "some_string1"), col("purchase_date"))).as("name13")
)
}

正如您所看到的,函数具有重复模式(我显示了2个,但有10多个),因为这是一个重复的代码,我想将公共逻辑提取到函数中。类似于(伪代码):

def compute(spark: SparkSession,
mydf: DataFrame): DataFrame = {

mydf
.groupBy(col("col1"), col("col2")
.agg(
func("col5", "some_string_to_check", "purchase_date", ["name1", "name2", "name3", "name4"]),
func("col5", "some_string1", "purchase_date", ["name10", "name11", "name12", "name13"])
)
}

def func(col, string_to_compate, date_col, array_of_name_results) = {
count(col === string_to_compate, col(date_col))).as(array_of_name_results[0]),
count(when(col  === string_to_compate, col(date_col))).as(array_of_name_results[1]),
count(when(col  === string_to_compate), col(date_col))).as(array_of_name_results[2]),
count(when(col === string_to_compate), col(date_col))).as(array_of_name_results[3]),
}

这能做到吗?我找不到任何参考,如果有可能使用函数来创建.agg()的逻辑?如有任何帮助,不胜感激。

如果考虑agg的函数签名,您将看到:

def agg(expr: Column, exprs: Column*): DataFrame

基于此,我们可以创建一个聚合Column表达式列表并传入:

def calculate(spark: SparkSession,
mydf: DataFrame): DataFrame = {
def func(col: Column, string_to_compate: String, date_col: String, array_of_name_results: Array[String]): List[Column] = ???
val aggregates = func(col("x"), "y", "z", Array.empty)
mydf
.groupBy(col("col1"), col("col2"))
.agg(aggregates.head, aggregates.tail:_*)
}

它看起来不像纯方法调用那么优雅,但确实允许您重用func中绑定的逻辑。

最新更新