Pyspark-filter,groupby,聚合不同的列和函数组合



我在Pyspark中有一个简单的操作要做,但我需要使用许多不同的参数来运行该操作。它只是在一列上进行筛选,然后按另一列进行分组,然后在第三列上进行聚合。在Python中,函数为:

def filter_gby_reduce(df, filter_col = None, filter_value = None):
return df.filter(col(filter_col) == filter_value).groupby('ID').agg(max('Value'))

假设不同的配置是

func_params = spark.createDataFrame([('Day', 'Monday'), ('Month', 'January')], ['feature', 'filter_value'])

我当然可以一个接一个地运行这些功能:

filter_gby_reduce(df, filter_col = 'Day', filter_value = 'Monday')
filter_gby_reduce(df, filter_col = 'Month', filter_value = 'January')

但我实际收集的参数要大得多。最后,我还需要将所有的函数结果union合并到一个数据帧中。那么,在spark中有没有一种方法可以更简洁地写这篇文章,并充分利用并行化的优势?

执行此操作的一种方法是使用whenmax生成所需的值作为列,并将这些值传递给agg。如果希望值统一,则必须使用stack(没有DataFrame API,因此使用selectExpr(来取消对结果的透视。根据您的数据集,如果过滤器排除了所有数据,您可能会得到null,如果需要,可以删除这些数据。

我建议测试这一点,而不是简单地统一大量过滤后的数据帧的"天真"方法。

import pyspark.sql.functions as f
func_params = [('Day', 'Monday'), ('Month', 'January')]
df = spark.createDataFrame([
('Monday', 'June', 1, 5), 
('Monday', 'January', 1, 2), 
('Monday', 'June', 1, 5),
('Monday', 'June', 2, 10)], 
['Day', 'Month', 'ID', 'Value'])

cols = []
for column, flt in func_params:
name = f'{column}_{flt}'
val = f.when(f.col(column) == flt, f.col('Value')).otherwise(None)
cols.append(f.max(val).alias(name))
stack = f"stack({len(cols)}," + ','.join(f"'{column}_{flt}', {column}_{flt}" for column, flt in func_params) + ')'
(df
.groupby('ID')
.agg(*cols)
.selectExpr('ID', stack)
.withColumnRenamed('col0', 'param')
.withColumnRenamed('col1', 'Value')
.show()
)
+---+-------------+-----+                                                       
| ID|        param|Value|
+---+-------------+-----+
|  1|   Day_Monday|    5|
|  1|Month_January|    2|
|  2|   Day_Monday|   10|
|  2|Month_January| null|
+---+-------------+-----+

最新更新