是否可以使用 pyspark 来加速对非常大的数组的每一列的回归分析?



我有一个非常大的数组。我想对数组的每一列进行线性回归。为了加快计算速度,我创建了一个列表,其中数组的每一列都作为其元素。然后,我使用 pyspark 创建了一个 RDD,并进一步在其上应用了一个定义的函数。我在创建RDD(即并行化)时遇到了内存问题。

我试图通过设置 spark-defaults.conf 将 spark.driver.memory 提高到 50g,但该程序似乎仍然死了。

import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
from pyspark import SparkContext
sc = SparkContext("local", "get Linear Coefficients")
def getLinearCoefficients(column):
y=column[~np.isnan(column)] # Extract column non-nan values
x=np.where(~np.isnan(column))[0]+1 # Extract corresponding indexs plus 1
# We only do linear regression interpolation when there are no less than 3 data pairs exist.
if y.shape[0]>=3:
model=LinearRegression(fit_intercept=True) # Intilialize linear regression model
model.fit(x[:,np.newaxis],y) # Fit the model using data
n=y.shape[0]
slope=model.coef_[0]
intercept=model.intercept_
r2=r2_score(y,model.predict(x[:,np.newaxis]))
rmse=np.sqrt(mean_squared_error(y,model.predict(x[:,np.newaxis])))
else:
n,slope,intercept,r2,rmse=np.nan,np.nan,np.nan,np.nan,np.nan
return n,slope,intercept,r2,rmse
random_array=np.random.rand(300,2000*2000) # Here we use a random array without missing data for testing purpose.
columns=[col for col in random_array.T]
columnsRDD=sc.parallelize(columns)
columnsLinearRDD=columnsRDD.map(getLinearCoefficients)
n=np.array([e[0] for e in columnsLinearRDD.collect()])
slope=np.array([e[1] for e in columnsLinearRDD.collect()])
intercept=np.array([e[2] for e in columnsLinearRDD.collect()])
r2=np.array([e[3] for e in columnsLinearRDD.collect()])
rmse=np.array([e[4] for e in columnsLinearRDD.collect()])

程序输出停滞不前,如下所示。

Exception in thread "dispatcher-event-loop-0" java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:486)
at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:467)
at scala.Option.map(Option.scala:146)
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:315)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$11.apply(TaskSchedulerImpl.scala:412)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$11.apply(TaskSchedulerImpl.scala:409)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:409)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:396)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396)
at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86)
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

我想可以使用 pyspark 来加快计算速度,但我该怎么做呢?修改 spark-defaults.conf 中的其他参数?或者矢量化数组的每一列(我知道 Python3 中的 range() 函数是这样做的,而且它真的更快。

那样行不通。你基本上在做三件事:

  1. 您正在使用RDD进行并行化,
  2. 你正在调用你的getLinearCoefficients()函数,最后
  3. 你调用 collect() 来使用你现有的代码。

第一点没有错,但第二步和第三步有很大的错误。你的getLinearCoefficients()函数不能从pyspark中受益,因为你使用numpy和sklearn(看看这篇文章以获得更好的解释)。对于您正在使用的大多数函数,都有一个 pyspark 等效项。 第三步的问题是 collect() 函数。当你调用collect()时,pyspark会将RDD的所有行带到驱动程序中,并在那里执行sklearn函数。因此,您只能获得 sklearn 允许的并行化。使用 pyspark 以您目前的方式完全没有意义,甚至可能是一个缺点。Pyspark不是一个允许你并行运行python代码的框架。当你想与pyspark并行执行代码时,你必须使用pyspark函数。

那你能做什么?

  • 首先,您可以使用 LinearRegession 类的 n_jobs 参数来使用多个内核进行计算。这允许您至少使用一台计算机的所有内核。
  • 你可以做的另一件事是离开sklearn并使用pyspark的linearRegression(看看指南和api)。有了这个,您可以使用整个集群进行线性回归。

对于样本超过 100k 的大型数据集,不鼓励使用LinearRegression。一般建议是使用SGDRegressor并正确设置参数,以便使用 OLS 损耗:

from sklearn.linear_model import SGDRegressor

并将您的LinearRegression替换为:

model = SGDRegressor(loss=’squared_loss’, penalty=’none’, fit_intercept=True)

设置loss=’squared_loss’penalty=’none’SGDRegressor设置为使用 OLS 而不进行正则化,因此它应该产生类似于LinearRegression的结果。

尝试一些选项,例如learning_rateeta0/power_t,以找到最佳的性能。

此外,我建议使用train_test_split来拆分数据集并使用测试集进行评分。一个好的测试大小是test_size=.3.

相关内容

最新更新