标准缩放在pyspark数据帧中花费了太多时间



我尝试过spark.ml的标准缩放器,它具有以下功能:

def standard_scale_2(df, columns_to_scale):
"""
Args:
df : spark dataframe
columns_to_scale : list of columns to standard scale
"""
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.functions import vector_to_array

# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Iterating over columns to be scaled
for i in columns_to_scale:
# VectorAssembler Transformation - Converting column to vector type
assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
# MinMaxScaler Transformation
scaler = StandardScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
# Pipeline of VectorAssembler and MinMaxScaler
pipeline = Pipeline(stages=[assembler, scaler])
# Fitting pipeline on dataframe
df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect",i).withColumnRenamed(i+"_scaled",i)
return df

我没有对每一列进行迭代,而是尝试一次缩放所有列,但也没有成功。

我也尝试过用这个简单的udf:/strong>进行标准缩放

for column in columns_to_standard_scale:
sdf = sdf.withColumn(column,
F.col(column) / sdf.agg(stddev_samp(column)).first()[0])
print(column, " completed")

我在数据块中使用带有c5d.2xlarg(16gb内存8核(节点(最多30个节点(的spark集群
火花数据帧的大小只有100k。我需要缩放大约90列。但每列大约需要10分钟才能缩放,当我试图一次性缩放所有列时,脚本甚至在2小时后都没有完成。但使用sklearn标准缩放器,熊猫中的相同数据帧几乎不需要2分钟。

我不认为代码或数据帧有任何问题,但我遗漏了一些造成瓶颈的东西,而且这个简单的操作花费了太多时间。

我在尝试构建列缩放管道时遇到了类似的问题。在我的数据集中,有400个功能,首先我想把它们作为一个单独的管道步骤添加:

stages = []    
for i,  col_to_scale in enumerate(scallarInputs):
col_scaler = StandardScaler(inputCol=col_to_scale, 
outputCol=col_to_scale+"_scaled",withStd=True, withMean=withMean)
stages += [col_scaler]
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)

对于我的数据集,运行了六个小时!

然后我决定先进行矢量组装,然后进行缩放:

stages = []
assemblerInputs = df.columns
assemblerInputs = [column for column in assemblerInputs if column not in columns_to_remove_from_assembler]
#add vector assembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features_nonscaled")
stages += [assembler]
col_scaler = StandardScaler(inputCol='features_nonscaled', outputCol='features',withStd=True, withMean=False)
stages += [col_scaler]
pipeline = Pipeline(stages = stages)
assemblerModel = pipeline.fit(df)

一切都花了17秒!

我希望这对有帮助

标准缩放火花代码没有任何问题。这是我之前不知道的spark的lazy evaluation,我认为这个标准的缩放函数有问题
实际上lazy evaluation意味着spark将等到最后一刻才执行计算指令图
在执行这个标准缩放器函数之前,我执行了一个反填充函数。这个反填充函数实际上是瓶颈,因为当我评论那个部分时,我的spark应用程序运行得很好。此外,反填充函数具有交叉联接、groubBy等wide transformations,这是非常低效的,因为它导致了大量的混洗操作。因此,我修改了该功能,因此,我的整个火花应用程序在30秒内完成

最新更新