sc.parallelize 不使用训练算法在 ML 管道中工作



使用 org.apache.spark.mllib 学习算法,我们过去常常在没有训练算法的情况下设置管道

var stages: Array[org.apache.spark.ml.PipelineStage] = index_transformers :+ assembler
val pipeline = new Pipeline().setStages(stages)

然后在我们使用 LabeledPoint 为训练算法准备好数据之后,最后我们曾经使用类似的东西来训练模型

val model = GradientBoostedTrees.train(sc.parallelize(trainingData.collect()), boostingStrategy)

我们必须注意,如果我们使用"sc.parallelize",训练似乎永远不会结束。

现在有了 org.apache.spark.ml 学习算法(由于setLabelCol和setFeaturesCol),我们可以将训练算法也包含在管道中。

val model = new GBTRegressor().setLabelCol(target_col_name).setFeaturesCol("features").setMaxIter(10)
var stages: Array[org.apache.spark.ml.PipelineStage] = index_transformers :+ assembler :+ model
val pipeline = new Pipeline().setStages(stages)

但是现在当我们传递数据时,它除了一个数据帧,而不是像sc.parallelize那样的数据行。所以下面的代码

val model = pipeline.fit(sc.parallelize(df_train))

引发以下错误:

<console>:57: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
 required: Seq[?]

虽然这

val model = pipeline.fit(df_train)

永无止境。

这个问题的解决方案是什么?

代码的主要问题是使用驱动程序作为数据的桥接器。 即,您正在收集所有分布式数据到您的驱动程序并将其传递回您的所有节点。另一个问题是您实际上正在使用ML功能,这意味着您必须使用 DataFrame s 而不是 RDD s。因此,您需要做的是 将您的RDD转换为 DataFrame .请注意,有很多方法可以实现这一点,您可以查看如何在Spark中将RDD对象转换为DataFrame,另一种方法是使用toDF方法。

最新更新