有pyspark.sql.functions.least
或pyspark.sql.functions.greatest
这样的方法,但我看不到任何关于mean/stddev/sum等的方法…
我以为我可以调整DF,但这需要太多的内存:data.groupby("date").pivot("date").min()
所以我实现了以下功能:
def null_to_zero(*columns):
return [(f.when(~f.isnull(c), f.col(c)).otherwise(0)) for c in columns]
def row_mean(*columns):
N = len(columns)
columns = null_to_zero(*columns)
return sum(columns) / N
def row_stddev(*columns):
N = len(columns)
mu = row_mean(*columns)
return f.sqrt((1 / N) * sum(f.pow(col - mu, 2) for col in null_to_zero(*columns)))
day_stats = data.select(
f.least(*data.columns[:-1]).alias("min"),
f.greatest(*data.columns[:-1]).alias("max"),
row_mean(*data.columns[:-1]).alias("mean"),
row_stddev(*data.columns[:-1]).alias("stddev"),
data.columns[-1],
).show()
样品
每行的平均值
输入DF
col1|col2
1|2
2,3
输出DF
mean
1.5
2.5
有更干净的方法吗?
你可以尝试这样的东西(不确定性能方面它更好,但它更干净(:
1.将所需列合并到阵列类型
2.计算阵列的平均值
import pyspark.sql.functions as f
from pyspark.sql.types import *
sdf.withColumn("new_col", f.array( f.col("a"),f.col("b"))
.cast(ArrayType(FloatType())))
.withColumn("mean", f.expr('aggregate(new_col, 0L, (acc,x) -> acc+x, acc -> acc /size(new_col))'))
.show(3)
+---+---+--------+----+
| a| b| new_col|mean|
+---+---+--------+----+
| 10| 41|[10, 41]|25.5|
| 9| 41| [9, 41]|25.0|
| 9| 41| [9, 41]|25.0|
+---+---+--------+----+
only showing top 3 rows