我正在尝试学习PySpark,并试图学习如何使用SQL当()子句,以便更好地对数据进行分类。(参见这里:https://sparkbyexamples.com/spark/spark-case-when-otherwise-example/)我似乎无法解决的是如何将实际的标量值插入when()条件中以显式地进行比较。看起来聚合函数返回的表值比实际的float()类型更多。
我不断得到这个错误消息不支持的操作数类型(s)为-:'方法'和'方法'
当我尝试运行函数来聚合原始数据帧中的另一列时,我注意到结果似乎不是一个平坦的标量表(agg(select(f.s ddev("Col"))给出的结果如下:"DataFrame[stddev_samp(TAXI_OUT): double]">)如果您想要复制,这里是我试图完成的示例,我想知道如何在when()子句中获得诸如标准偏差和平均值之类的聚合值,以便您可以使用它对新列进行分类:
samp = spark.createDataFrame(
[("A","A1",4,1.25),("B","B3",3,2.14),("C","C2",7,4.24),("A","A3",4,1.25),("B","B1",3,2.14),("C","C1",7,4.24)],
["Category","Sub-cat","quantity","cost"])
psMean = samp.agg({'quantity':'mean'})
psStDev = samp.agg({'quantity':'stddev'})
psCatVect = samp.withColumn('quant_category',.when(samp['quantity']<=(psMean-psStDev),'small').otherwise('not small')) ```
示例中的psMean和psStdev是数据框架,您需要使用collect()方法来提取标量值
psMean = samp.agg({'quantity':'mean'}).collect()[0][0]
psStDev = samp.agg({'quantity':'stddev'}).collect()[0][0]
您也可以创建一个包含所有状态的变量作为pandas DataFrame,并在稍后的pyspark代码中引用它:
from pyspark.sql import functions as F
stats = (
samp.select(
F.mean("quantity").alias("mean"),
F.stddev("quantity").alias("std")
).toPandas()
)
(
samp.withColumn('quant_category',
F.when(
samp['quantity'] <= stats["mean"].item() - stats["std"].item(),
'small')
.otherwise('not small')
)
.toPandas()
)