有没有一种方法可以获得pyspark中一行的stddev/mean/sum



pyspark.sql.functions.leastpyspark.sql.functions.greatest这样的方法,但我看不到任何关于mean/stddev/sum等的方法…

我以为我可以调整DF,但这需要太多的内存:data.groupby("date").pivot("date").min()

所以我实现了以下功能:

def null_to_zero(*columns):
return [(f.when(~f.isnull(c), f.col(c)).otherwise(0)) for c in columns]

def row_mean(*columns):
N = len(columns)
columns = null_to_zero(*columns)
return sum(columns) / N

def row_stddev(*columns):
N = len(columns)
mu = row_mean(*columns)
return f.sqrt((1 / N) * sum(f.pow(col - mu, 2) for col in null_to_zero(*columns)))

day_stats = data.select(
f.least(*data.columns[:-1]).alias("min"),
f.greatest(*data.columns[:-1]).alias("max"),
row_mean(*data.columns[:-1]).alias("mean"),
row_stddev(*data.columns[:-1]).alias("stddev"),
data.columns[-1],
).show()

样品

每行的平均值

输入DF

col1|col2
1|2
2,3

输出DF

mean
1.5
2.5

有更干净的方法吗?

你可以尝试这样的东西(不确定性能方面它更好,但它更干净(:

1.将所需列合并到阵列类型

2.计算阵列的平均值

import pyspark.sql.functions as f
from pyspark.sql.types import *
sdf.withColumn("new_col", f.array( f.col("a"),f.col("b"))
.cast(ArrayType(FloatType())))
.withColumn("mean",  f.expr('aggregate(new_col, 0L, (acc,x) -> acc+x, acc -> acc /size(new_col))'))
.show(3)
+---+---+--------+----+
|  a|  b| new_col|mean|
+---+---+--------+----+
| 10| 41|[10, 41]|25.5|
|  9| 41| [9, 41]|25.0|
|  9| 41| [9, 41]|25.0|
+---+---+--------+----+
only showing top 3 rows

最新更新