我最近一直在尝试了解Apache Spark作为Scikit Learn的替代品,但是在我看来,即使在简单的情况下,Scikit收敛到准确的模型的速度也比Spark快得多。例如,我使用以下脚本为一个非常简单的线性函数 (z=x+y) 生成了 1000 个数据点:
from random import random
def func(in_vals):
'''result = x (+y+z+w....)'''
result = 0
for v in in_vals:
result += v
return result
if __name__ == "__main__":
entry_count = 1000
dim_count = 2
in_vals = [0]*dim_count
with open("data_yequalsx.csv", "w") as out_file:
for entry in range(entry_count):
for i in range(dim_count):
in_vals[i] = random()
out_val = func(in_vals)
out_file.write(','.join([str(x) for x in in_vals]))
out_file.write(",%sn" % str(out_val))
然后我运行了以下 Scikit 脚本:
import sklearn
from sklearn import linear_model
import numpy as np
data = []
target = []
with open("data_yequalsx.csv") as inFile:
for row in inFile:
vals = row.split(",")
data.append([float(x) for x in vals[:-1]])
target.append(float(vals[-1]))
test_samples= len(data)/10
train_data = [0]*(len(data) - test_samples)
train_target = [0]*(len(data) - test_samples)
test_data = [0]*(test_samples)
test_target = [0]*(test_samples)
train_index = 0
test_index = 0
for j in range(len(data)):
if j >= test_samples:
train_data[train_index] = data[j]
train_target[train_index] = target[j]
train_index += 1
else:
test_data[test_index] = data[j]
test_target[test_index] = target[j]
test_index += 1
model = linear_model.SGDRegressor(n_iter=100, learning_rate="invscaling", eta0=0.0001, power_t=0.5, penalty="l2", alpha=0.0001, loss="squared_loss")
model.fit(train_data, train_target)
print(model.coef_)
print(model.intercept_)
result = model.predict(test_data)
mse = np.mean((result - test_target) ** 2)
print("Mean Squared Error = %s" % str(mse))
然后这个 Spark 脚本:(使用 spark-submit,没有其他参数)
from pyspark.mllib.regression import LinearRegressionWithSGD, LabeledPoint
from pyspark import SparkContext
sc = SparkContext (appName="mllib_simple_accuracy")
raw_data = sc.textFile ("data_yequalsx.csv", minPartitions=10) #MinPartitions doesnt guarantee that you get that many partitions, just that you wont have fewer than that many partitions
data = raw_data.map(lambda line: [float(x) for x in line.split (",")]).map(lambda entry: LabeledPoint (entry[-1], entry[:-1])).zipWithIndex()
test_samples= data.count()/10
training_data = data.filter(lambda (entry, index): index >= test_samples).map(lambda (lp,index): lp)
test_data = data.filter(lambda (entry, index): index < test_samples).map(lambda (lp,index): lp)
model = LinearRegressionWithSGD.train(training_data, step=0.01, iterations=100, regType="l2", regParam=0.0001, intercept=True)
print(model._coeff)
print(model._intercept)
mse = (test_data.map(lambda lp: (lp.label - model.predict(lp.features))**2 ).reduce(lambda x,y: x+y))/test_samples;
print("Mean Squared Error: %s" % str(mse))
sc.stop ()
奇怪的是,尽管两个模型的设置几乎相同(据我所知),但火花给出的误差比Scikit给出的误差大一个数量级(分别为0.185和0.045)。我知道这是使用 SGD 的迭代次数很少,因此结果可能会有所不同,但我不会想到它会接近如此大的差异或如此大的错误,特别是考虑到异常简单的数据。
我在 Spark 中有什么误解吗?配置不正确吗?当然,我应该得到比这更小的错误吗?
SGD代表随机梯度下降,是一种在线凸优化算法,因此很难并行化,因为它每次迭代都会进行一次更新(有更聪明的变体,例如小批量的SGD,但对于并行环境仍然不是很好。
另一方面,批处理算法,例如L-BFGS,我建议您将其与Spark(LogigisticRegressionWithLBFGS)一起使用,可以很容易地并行化,因为它可以按epoch进行迭代(它需要查看所有数据点,计算每个点的损失函数的值和梯度,然后执行聚合以计算完整的梯度)。
Python在一台机器上运行,因此SGD表现良好。
顺便说一下,如果你查看MLlib代码,相当于scikit learn的lambda是数据集的lambda/size(mllib优化1/n*sum(l_i(x_i,f(y_i)) + lambda
而scikit learn优化sum(l_i(x_i,f(y_i)) + lambda
由于 Spark 是并行的,因此在计算过程中,每个节点都需要能够独立于其他节点工作,以避免节点之间出现 [时间] 昂贵的洗牌。因此,它使用称为随机梯度下降的过程来接近最小值,该过程遵循局部梯度向下。
解决[简单,最小二乘]回归问题的"精确"方法涉及求解矩阵方程。这可能是Scikit-Learn正在做的事情,所以在这种情况下它会更准确。
权衡是,对于大小为 N 的方阵,求解矩阵方程通常缩放为 N^3,这对于大型数据集来说很快就会变得不可行。Spark 将准确性换成了计算能力。与任何机器学习过程一样,您应该在整个算法中构建大量健全性检查,以确保上一步的结果有意义。
希望这有帮助!