使用HyperOpt和SparkTrials时,如何在工作节点上创建spark数据帧



我正试图在Databricks上使用HyperOpt和SparkTrials并行运行ML试验。

我的opjective函数使用spark.createDataFrame(results)将输出转换为spark数据帧(以重用我之前创建的一些预处理代码——我不想重写它(。

然而,当试图使用HyperOpt和SparkTrials时,这会导致错误,因为SparkContext用于创建数据帧";应该只在驱动程序上创建或访问";。有什么方法可以在这里的目标函数中创建一个sparkDataFrame吗?

例如:

from sklearn.datasets import load_iris
from sklearn.model_selection import cross_val_score
from sklearn.svm import SVC
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK, Trials
from pyspark.sql import SparkSession
# If you are running Databricks Runtime for Machine Learning, `mlflow` is already installed and you can skip the following line. 
import mlflow
# Load the iris dataset from scikit-learn
iris = iris = load_iris()
X = iris.data
y = iris.target
def objective(C):
# Create a support vector classifier model
clf = SVC(C)
# THESE TWO LINES CAUSE THE PROBLEM
ss = SparkSession.builder.getOrCreate()
sdf = ss.createDataFrame([('Alice', 1)])
# Use the cross-validation accuracy to compare the models' performance
accuracy = cross_val_score(clf, X, y).mean()

# Hyperopt tries to minimize the objective function. A higher accuracy value means a better model, so you must return the negative accuracy.
return {'loss': -accuracy, 'status': STATUS_OK}
search_space = hp.lognormal('C', 0, 1.0)
algo=tpe.suggest
# THIS WORKS (It's not using SparkTrials)
argmin = fmin(
fn=objective,
space=search_space,
algo=algo,
max_evals=16)
from hyperopt import SparkTrials
spark_trials = SparkTrials()
# THIS FAILS
argmin = fmin(
fn=objective,
space=search_space,
algo=algo,
max_evals=16,
trials=spark_trials)

我试过研究这个问题,但它解决了一个不同的问题——我看不出有什么明显的方法可以将它应用于我的情况。如何在代码的任何位置获取当前SparkSession?

我认为简单的答案是这是不可能的。火花上下文只能存在于驱动程序节点上。创建一个新实例将是一种嵌套,请参阅此相关问题。

在Spark中嵌套并行化?什么';这是正确的方法吗?

最后,我通过重写pandas中的转换解决了我的问题,这将起作用。

如果转换对于单个节点来说太大,那么您可能必须预先计算它们,并让hyperopt选择哪个版本作为优化的一部分。

最新更新