PySpark groupBy中的中位数/分位数



我想计算Spark数据帧上的组分位数(使用PySpark)。近似或确切的结果都可以。我更喜欢可以在 groupBy/agg 上下文中使用的解决方案,以便我可以将其与其他 PySpark 聚合函数混合使用。如果由于某种原因无法做到这一点,也可以采用不同的方法。

这个问题是相关的,但没有指示如何将approxQuantile用作聚合函数。

我也可以访问percentile_approx Hive UDF,但我不知道如何将其用作聚合函数。

为了具体起见,假设我有以下数据帧:

from pyspark import SparkContext
import pyspark.sql.functions as f
sc = SparkContext()    
df = sc.parallelize([
    ['A', 1],
    ['A', 2],
    ['A', 3],
    ['B', 4],
    ['B', 5],
    ['B', 6],
]).toDF(('grp', 'val'))
df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()

预期成果:

+----+-------+
| grp|med_val|
+----+-------+
|   A|      2|
|   B|      5|
+----+-------+

我想你不再需要它了。但会把它留给后代(即下周我忘记的时候)。

from pyspark.sql import Window
import pyspark.sql.functions as F
grp_window = Window.partitionBy('grp')
magic_percentile = F.expr('percentile_approx(val, 0.5)')
df.withColumn('med_val', magic_percentile.over(grp_window))

或者为了准确解决您的问题,这也可以:

df.groupBy('grp').agg(magic_percentile.alias('med_val'))

作为奖励,您可以传递一系列百分位数:

quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')

你会得到一个列表作为回报。

由于您可以访问percentile_approx,一个简单的解决方案是在SQL命令中使用它:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df.registerTempTable("df")
df2 = sqlContext.sql("select grp, percentile_approx(val, 0.5) as med_val from df group by grp")

更新:现在可以了,请参阅上面接受的答案)

<小时 />不幸的是,据

我所知,似乎不可能使用"纯"PySpark 命令来做到这一点(Shaido 的解决方案提供了 SQL 的解决方法),原因非常基本:与其他聚合函数相比,例如 meanapproxQuantile不返回Column类型, 而是一个列表

让我们看一个包含示例数据的快速示例:

spark.version
# u'2.2.0'
import pyspark.sql.functions as func
from pyspark.sql import DataFrameStatFunctions as statFunc
# aggregate with mean works OK:
df_grp_mean = df.groupBy('grp').agg(func.mean(df['val']).alias('mean_val'))
df_grp_mean.show()
# +---+--------+ 
# |grp|mean_val|
# +---+--------+
# |  B|     5.0|
# |  A|     2.0|
# +---+--------+
# try aggregating by median:
df_grp_med = df.groupBy('grp').agg(statFunc(df).approxQuantile('val', [0.5], 0.1))
# AssertionError: all exprs should be Column
# mean aggregation is a Column, but median is a list:
type(func.mean(df['val']))
# pyspark.sql.column.Column
type(statFunc(df).approxQuantile('val', [0.5], 0.1))
# list

怀疑基于窗口的方法会有任何不同,因为正如我所说,根本原因是一个非常基本的原因。

有关更多详细信息,请参阅我的回答 此处.

似乎完全可以通过使用 percentile_approx pyspark >= 3.1.0

来解决
import pyspark.sql.functions as func    
df.groupBy("grp").agg(func.percentile_approx("val", 0.5).alias("median"))

有关更多信息,请参阅:https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.percentile_approx.html

使用 pyspark==2.4.5 执行此操作的最简单方法是:

df 
    .groupby('grp') 
    .agg(expr('percentile(val, array(0.5))')[0].alias('p50')) 
    .show()

输出:

|grp|p50|
+---+---+
|  B|5.0|
|  A|2.0|
+---+---+

"percentile_approx(val, 0.5)"的问题:例如,如果范围是 [1,2,3,4],则此函数返回 2(作为中位数),则下面的函数返回 2.5:

import statistics
median_udf = F.udf(lambda x: statistics.median(x) if bool(x) else None, DoubleType())
... .groupBy('something').agg(median_udf(F.collect_list(F.col('value'))).alias('median'))

当组中有偶数个条目时,可能上述所有答案都可能无法给出正确的答案。为了使它通用并使其在这些情况下工作,第 50 个百分位数和下一个值的平均值将是最好的。

df_grp = df.groupBy('grp').agg(
  F.percentile_approx('val', 0.5).alias('med_val'),
  ((F.percentile_approx('val', 0.5)+ F.percentile_approx('val', 0.500000000001))*.5).alias('med_val2')
  )
df_grp.show()

在上面的代码中,即使组中有偶数个条目,med_val2也会为您提供正确的中位数。数字 0.5000000000001 刚刚选择它略高于 0.5,因此它甚至适用于大型数据集。

Spark 3.4+ median (精确中位数),可以直接在 PySpark 中访问:

F.median('val')

使用示例数据帧:

df.groupBy('grp').agg(F.median('val')).show()
# +---+-----------+
# |grp|median(val)|
# +---+-----------+
# |  A|        2.0|
# |  B|        5.0|
# +---+-----------+
<小时 />其他分位数

(四分位数、百分位数等)可以使用percentilepercentile_approx

计算
  • 给定percent_rank(百分位数)值的近似值

    df = df.groupBy('grp').agg(
        F.expr('percentile_approx(val, .25)').alias('lower_quartile_approx'),
        F.expr('percentile_approx(val, .75)').alias('upper_quartile_approx'),
        F.expr('percentile_approx(val, array(.25, .5, .75))').alias('all_quartiles_approx'),
        F.expr('percentile_approx(val, .9)').alias('90th_percentile_approx'),
    )
    df.show()
    # +---+---------------------+---------------------+--------------------+----------------------+
    # |grp|lower_quartile_approx|upper_quartile_approx|all_quartiles_approx|90th_percentile_approx|
    # +---+---------------------+---------------------+--------------------+----------------------+
    # |  A|                    1|                    3|           [1, 2, 3]|                     3|
    # |  B|                    4|                    6|           [4, 5, 6]|                     6|
    # +---+---------------------+---------------------+--------------------+----------------------+
    
  • 给定percent_rank(百分位数)值的准确值:

    df = df.groupBy('grp').agg(
        F.expr('percentile(val, .25)').alias('lower_quartile_acc'),
        F.expr('percentile(val, .75)').alias('upper_quartile_acc'),
        F.expr('percentile(val, array(.25, .5, .75))').alias('all_quartiles_acc'),
        F.expr('percentile(val, .9)').alias('90th_percentile_acc'),
    )
    df.show()
    # +---+------------------+------------------+-----------------+-------------------+
    # |grp|lower_quartile_acc|upper_quartile_acc|all_quartiles_acc|90th_percentile_acc|
    # +---+------------------+------------------+-----------------+-------------------+
    # |  A|               1.5|               2.5|  [1.5, 2.0, 2.5]| 2.8000000000000003|
    # |  B|               4.5|               5.5|  [4.5, 5.0, 5.5]|  5.800000000000001|
    # +---+------------------+------------------+-----------------+-------------------+
    

相关内容

  • 没有找到相关文章

最新更新