Pyspark一直挂在数据集上,而Pandas工作得很好



我遵循这个教程:https://towardsdatascience.com/how -发展- -信用风险模型和计分卡- 91335 fc01f03

我在Google Colab中成功实现了它。现在,我试着把它转换成Pyspark。然而,Pyspark一直挂起并不断导致内存不足的问题。这是我的代码:

从驱动器下载数据:

%cd "/content/drive/MyDrive/"
!gdown --id "1xaF743cmUgI5kc76I86AeZDE84SMkMnt"

初始化spark会话:

spark = SparkSession 
.builder 
.appName("program2") 
.master("local[*]") 
.config("spark.memory.offHeap.enabled",True)
.config("spark.memory.offHeap.size","16g").getOrCreate()
conf = SparkConf()
conf.set('spark.executor.memory', '10G') 
.set('spark.driver.memory', '10G')
.set('spark.driver.maxResultSize', '10G') 
.set('spark.kryoserializer.buffer.max', '128m')
.set('spark.kryoserializer.buffer.max.mb', '128m')

sc = SparkContext.getOrCreate(conf=conf)
sc 
# config = SparkConf().setAll([('setMaster', '1') ])
# sc.stop()
# sc = SparkContext(conf=config)
df = spark.read.load("loan_data_2007_2014.csv",format="csv", sep=",", inferSchema="true", header="true")
df.rdd.getNumPartitions()

删除空值超过80%的列

null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
# to_drop = [k for k, v in null_counts.items() if v > 0]
# df = df.drop(*to_drop)
count = df.count()
to_drop = []
for k,v in null_counts.items():
if v > 0.8 * count:
to_drop.append(k)
df = df.drop(*to_drop)  

创建标签col

df = df.withColumn('good_bad',when(df.loan_status.isin(['Charged Off', 'Default', 'Late (31-120 days)','Does not meet the credit policy. Status:Charged Off']),1).otherwise('0'))

分隔分类列和数值列

# label columns
colLabel = "good_bad"
# categorical columns
colNum = ["member_id","loan_amnt","funded_amnt","funded_amnt_inv","int_rate","installment","annual_inc","dti","delinq_2yrs","inq_last_6mths","mths_since_last_delinq","open_acc","pub_rec","revol_bal","revol_util","total_acc","out_prncp","out_prncp_inv","total_pymnt","total_pymnt_inv","total_rec_prncp","total_rec_int","total_rec_late_fee","recoveries","collection_recovery_fee","last_pymnt_amnt","collections_12_mths_ex_med","mths_since_last_major_derog","policy_code","acc_now_delinq","tot_coll_amt","tot_cur_bal","total_rev_hi_lim"]
#numerical columns
colCat = list(__builtin__.filter(lambda x: x != colLabel and x not in colNum  , df.columns))

colCat

计算缺失值

cols = colNum
for col in cols:
df = df.withColumn(
col,
F.col(col).cast(DoubleType())
)
from pyspark.ml.feature import Imputer
imputer = Imputer().setInputCols(colNum).setOutputCols(colNum).setStrategy("median")
model = imputer.fit(df)
imputeddf = model.transform(df)

将数值归一化

assembler = VectorAssembler(inputCols = cols, outputCol="features")
adf = assembler.transform(imputeddf)

scaler = StandardScaler(inputCol=("features"), outputCol=("scaledFeatures"),withStd=True, withMean=True)

scalerModel = scaler.fit(adf)
# Normalize each feature to have unit standard deviation.
sdf = scalerModel.transform(adf)

这是标签编码目录列

from pyspark.ml.feature import StringIndexer
# from pyspark.sql.functions import array_contains, col, explode
from pyspark.sql.window import Window
# colCat.append("id")
l1 = ["id", "issue_d","url","desc","earliest_cr_line","last_pymnt_d","next_pymnt_d"]
colCat = [x for x in colCat if x not in l1]
for c in colCat:
print(c)

ndf = sdf.withColumn(c+"_num", F.dense_rank().over(Window.orderBy(sdf[c])))
# indexer = StringIndexer(inputCols = colCat, outputCols=[k+"_num" for k in colCat])
# df = indexer.fit(df).transform(df)
# df.select([colCat[0], colCat[0]+"num"]).show(5)
# indexer.fit(df).labels
# idxHousing.select(["ocean_proximity", "ocean_proximity_num"]).show(5)
colCat

之后,我的代码停止工作。它甚至不执行df.show()。字符串索引器有内存泄漏问题,但写我自己的代码也不工作。4M的数据集不够大。它在Pandas中工作得很好,那么为什么它在Spark中产生问题呢?虽然spark的目的是处理大量的数据

Spark不像Pandas那样工作。所以简而言之,你所有的计算都在一次执行中(阅读spark中的惰性计算)。我有几点建议给你:

  1. 避免在代码的任何地方使用collect语句。与空列标识一样,您可以在不收集的情况下实现它。寻找一个方法,我相信这很容易。
  2. 由于您正在使用ML Lib应用转换,因此建议您在重要间隔后创建临时表。例如,您可以在清理数据后创建一个临时表,在规范化后创建一个临时表,或类似的。
  3. repartitioning/coalesce:您需要了解您的数据分区以及它是如何被洗牌的。对于这种情况,我建议您在编写临时表时进行重分区。Partition = executor*cores

这将大大消除您的问题,并将使该管道更具可扩展性和健壮性。

最新更新