我有pyspark dataframe( not pandas ) df
使用collect()
很大。因此,低于授予的代码不是有效的。它正在使用少量数据,但是现在失败了。
import numpy as np
myList = df.collect()
total = []
for product,nb in myList:
for p2,score in nb:
total.append(score)
mean = np.mean(total)
std = np.std(total)
是否有任何方法可以使用pyspark.sql.functions
或类似的方法将mean
和std
作为两个变量?
from pyspark.sql.functions import mean as mean_, std as std_
我可以使用withColumn
,但是,此方法逐行应用计算,并且它不会返回一个变量。
更新:
df
的样本内容:
+----------+------------------+
|product_PK| products|
+----------+------------------+
| 680|[[691,1], [692,5]]|
| 685|[[691,2], [692,2]]|
| 684|[[691,1], [692,3]]|
i应该计算score
值的平均值和标准偏差,例如[691,1]
中的值1
是分数之一。
您可以使用内置功能来获取汇总统计信息。这是获得卑鄙和标准偏差的方法。
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
df_stats = df.select(
_mean(col('columnName')).alias('mean'),
_stddev(col('columnName')).alias('std')
).collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
请注意,有三种不同的标准偏差函数。我使用的文档(stddev
)返回以下内容:
聚合功能:返回公正的样本标准偏差 组中的表达
您也可以使用describe()
方法:
df.describe().show()
请参阅此链接以获取更多信息:pyspark.sql.functions
update :这是您可以通过嵌套数据工作的方式。
使用explode
将值提取到单独的行中,然后以上图所示调用mean
和stddev
。
这是MWE:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev
# mock up sample dataframe
df = sqlCtx.createDataFrame(
[(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
["product_PK", "products"]
)
# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())
# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))
.withColumn('score', get_score(col('exploded')))
.select(
_mean(col('score')).alias('mean'),
_stddev(col('score')).alias('std')
)
.collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
print([mean, std])
输出:
[2.3333333333333335, 1.505545305418162]
您可以使用numpy
验证这些值是否正确:
vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])
说明:您的"products"
列是list
s的list
。调用explode
将为外部list
的每个元素做一个新的行。然后从每个爆炸行中获取"score"
值,您将其定义为2元素list
中的第二个元素。最后,在此新列上调用聚合功能。
您可以从pyspark.sql.functions
使用mean
和stddev
:
import pyspark.sql.functions as F
df = spark.createDataFrame(
[(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
["product_PK", "products"]
)
result_df = (
df
.withColumn(
'val_list',
F.array(df.products.getItem(0).getItem(1),df.products.getItem(1).getItem(1))
)
.select(F.explode('val_list').alias('val'))
.select(F.mean('val').alias('mean'), F.stddev('val').alias('stddev'))
)
print(result_df.collect())
输出:
[Row(mean=2.3333333333333335, stddev=1.505545305418162)]
您可以在此处阅读有关pyspark.sql.functions
的更多信息。
对于标准偏差,更好的写作方式如下。我们可以使用格式(至2个小数)并使用列别名名称
data_agg=SparkSession.builder.appName('Sales_fun').getOrCreate()
data=data_agg.read.csv('sales_info.csv',inferSchema=True, header=True)
from pyspark.sql.functions import *
*data.select((format_number(stddev('Sales'),2)).alias('Sales_Stdev')).show()*
如果您只想要任何列的Mean
和Std. dev
,则
我能想到的最简单的方法是使用agg
功能
获取列的平均值
df.agg({'produ': 'mean'}).show()
# or you can also use
data.agg({'balance': 'avg'}).show()
获取列的标准偏差
data.agg({'balance': 'stddev'}).show()
# and for variance you can use
data.agg({'balance': 'variance'}).show()