使用pandas-udf返回包含平均值的完整列



这很奇怪,我尝试在spark df上使用pandas-udf,只有当我选择并返回一个值,即列的平均值时,它才有效

但是如果我试图用这个值填充整列,那么就不起作用了

以下作品:

@pandas_udf(DoubleType())
def avg(col ) :
cl = np.average(col)   
return cl
df.select(avg('col' ))    

这样可以工作并返回一行的df,其中包含列的值平均值。

但是下面的不起作用

df.withColumn('avg', F.lit( avg(col))

为什么?如果avg(col(是一个值,那么为什么我不能用它来用lit((填充列呢?

就像下面的例子一样。当我返回一个常量时,这确实有效

@pandas_udf(DoubleType())
def avg(col ) :
return 5
df.withColumn('avg', avg(col)

我还试着返回了一个系列,但也不起作用

@pandas_udf(DoubleType())
def avg(col ) :
cl = np.average(col)   
return pd.Series([cl]* col.size())
df.withColumn('avg', avg(col))

不起作用。但是如果我使用常数而不是cl ,它确实有效

那么,基本上,我如何返回一个包含相同平均值的完整列,以用该值填充整个列呢?

lit在驱动程序上求值,而不是在执行器上的数据上执行。实现这一点的最佳方法是简单地为整个数据集定义一个窗口规范,并在窗口上调用聚合函数。这将消除对额外UDF的需要。

windowSpec = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('avg', avg(col).over(windowSpec))

类型将其强制转换为float()

我不确定你在这里想达到什么目的。为每一行调用UDF。因此,在UDF内部;col";表示每个单独的单元格值,但不表示整个列。

如果您的列类型为array/list:

df = spark.createDataFrame(
[
[[1.0, 2.0, 3.0, 4.0]],
[[5.0, 6.0, 7.0, 8.0]],
],
["num"]
)

@F.udf(returnType=DoubleType())
def avg(col):
import numpy as np
return float(np.average(col))
# 
df = df.withColumn("avg", avg("num"))
+--------------------+---+
|                 num|avg|
+--------------------+---+
|[1.0, 2.0, 3.0, 4.0]|2.5|
|[5.0, 6.0, 7.0, 8.0]|6.5|
+--------------------+---+

但是,如果您的列是像double/foat这样的标量类型,那么通过UDF的平均值将始终返回相同的列值:

df = spark.createDataFrame(
[[1.0],[2.0],[3.0],[4.0]],
["num"]
)

@F.udf(returnType=DoubleType())
def avg(col):
import numpy as np
return float(np.average(col))
# 
df = df.withColumn("avg", avg("num"))
+---+---+
|num|avg|
+---+---+
|1.0|1.0|
|2.0|2.0|
|3.0|3.0|
|4.0|4.0|
+---+---+

最新更新