我有一个非常大的CSV文件,它已作为PySpark数据帧导入:df
。数据帧包含许多列,包括列ireturn
。我想计算这一列的0.99和0.01百分位数,然后将另一列添加到数据帧df
中,作为new_col_99
和new_col_01
,其中分别包含0.99和0.01%的百分位数。我写了以下代码,适用于小型数据帧,但当我将其应用于大型数据帧时,会出现错误。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("name of the file", inferSchema = True, header = True)
precentile_99 = df.selectExpr('percentile(val1, 0.99)').head(1)[0][0]
precentile_01 = df.selectExpr('percentile(val1, 0.01)').head(1)[0][0]
from pyspark.sql.functions import lit
df = df.withColumn("new_col_99", lit(precentile_99))
df = df.withColumn("new_col_01", lit(precentile_01))
我尝试用collect
替换head
,但也不起作用。我得到这个错误:
日志记录错误---
错误:py4j.java_gateway:尝试连接到java服务器时出错(127.0.0.1:49850)
Traceback(最近一次调用最后一次):。。。
我也尝试过以下操作:
percentile = df.approxQuantile('ireturn',[0.01,0.99],0.25)
df = df.withColumn("new_col_01", lit(percentile[0]))
df = df.withColumn("new_col_99", lit(percentile[1]))
上面的代码运行大约需要15-20分钟,但结果是错误的(我在列ireturn
上的数据小于1,但它返回的0.99百分位数为6789……)
迟到了,但希望能回答您的问题。你可以这样得到结果:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("name of the file", inferSchema = True, header = True)
df = df.withColumn("new_col_99", F.expr('percentile(val1, 0.99) over()'))
df = df.withColumn("new_col_01", F.expr('percentile(val1, 0.01) over()'))
对于大型数据集,percentile_approx
可能更好:
df = df.withColumn("new_col_99", F.expr('percentile_approx(val1, 0.99) over()'))
df = df.withColumn("new_col_01", F.expr('percentile_approx(val1, 0.01) over()'))