PySpark DataFrame上分组数据的Pandas样式转换



如果我们有一个由一列类别和一列值组成的Pandas数据帧,我们可以通过以下操作删除每个类别中的平均值:

df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))

据我所知,Spark数据帧并没有直接提供这个group-by-transform操作(我在Spark 1.5.0上使用的是PySpark)。那么,实现这种计算的最佳方式是什么呢?

我已经尝试使用一个组by/join如下:

df2 = df.groupBy("Category").mean("Values")
df3 = df2.join(df)

但它非常慢,因为据我所知,每个类别都需要对DataFrame进行全面扫描。

我认为(但尚未验证)如果我将分组的结果收集到字典中,然后在UDF中使用该字典,我可以大大加快速度,如下所示:

nameToMean = {...}
f = lambda category, value: value - nameToMean[category]
categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())
df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))

有没有一种惯用的方法可以在不牺牲性能的情况下表达这种类型的操作?

据我所知,每个类别都需要对DataFrame进行全面扫描。

不,没有。使用类似于aggregateByKey的逻辑来执行DataFrame聚合。请参阅DataFrame组按行为/优化较慢的部分是join,它需要排序/混洗。但它仍然不需要按组扫描。

如果这是一个您使用的精确代码,那么它会很慢,因为您没有提供联接表达式。正因为如此,它只是执行笛卡尔乘积。因此,它不仅效率低下,而且是不正确的。你想要这样的东西:

from pyspark.sql.functions import col
means = df.groupBy("Category").mean("Values").alias("means")
df.alias("df").join(means, col("df.Category") == col("means.Category"))

我认为(但尚未验证)如果我将分组的结果收集到字典中,然后在UDF 中使用该字典,我可以大大加快速度

这是可能的,尽管性能会因具体情况而异。使用Python UDF的一个问题是,它必须在Python之间移动数据。不过,这绝对值得一试。不过,您应该考虑为nameToMean使用广播变量。

有没有一种惯用的方法可以在不牺牲性能的情况下表达这种类型的操作?

在PySpark 1.6中,您可以使用broadcast函数:

df.alias("df").join(
    broadcast(means), col("df.Category") == col("means.Category"))

但在<=1.5.

您可以使用Window执行此

import pyspark.sql.functions as F
from pyspark.sql.window import Window
window_var = Window().partitionBy('Categroy')
df = df.withColumn('DemeanedValues', F.col('Values') - F.mean('Values').over(window_var))

实际上,在Spark中有一种惯用的方法,使用Hive OVER表达式。

df.registerTempTable('df')
with_category_means = sqlContext.sql('select *, mean(Values) OVER (PARTITION BY Category) as category_mean from df')

在发动机罩下,这是使用车窗功能。我不确定这是否比您的解决方案更快,尽管

最新更新