我有一个scala类,旨在概括线性模型的某些功能 - 特定于用户应该能够创建一个具有系数和一系列预测变量和数组的实例该类从数据框架中汲取数据,并使用简单的线性模型在整个数据框架上创建预测。
我被困在最后一行中...我希望这会生成预测值的一列。我尝试了多种方法(除了其中一种方法都被评论了)。现在的代码不会编译类型不匹配的b/c:
[error] found : Array[org.apache.spark.sql.Column]
[error] required: org.apache.spark.sql.Column
[error] .withColumn("prediction", colMod(preds.map(p => data(p))))
[error] ^
...我也可以进入pred< - preds版本...和foreach版本:
[error] found : Unit
[error] required: org.apache.spark.sql.Column
[error] .withColumn("prediction", colMod(preds.foreach(data(_))))
[error] ^
一直在徒劳地解决……将感谢任何建议。
class LinearModel(coefficients: Array[Double],
predictors: Array[String],
data: DataFrame) {
val coefs = coefficients
val preds = Array.concat(Array("bias"), predictors)
require(coefs.length == preds.length)
/**
* predict: computes linear model predictions as the dot product of the coefficents and the
* values (X[i] in the model matrix)
* @param values: the values from a single row of the given variables from model matrix X
* @param coefs: array of coefficients to be applied to each of the variables in values
* (the first coef is assumed to be 1 for the bias/intercept term)
* @return: the predicted value
*/
private def predict(values: Array[Double], coefs: Array[Double]): Unit = {
(for ((c, v) <- coefs.zip(values)) yield c * v).sum
}
/**
* colMod (udf): passes the values for each relevant value to predict()
* @param values: an Array of the numerical values of each of the specified predictors for a
* given record
*/
private val colMod = udf((values: Array[Double]) => predict(values, coefs))
val dfPred = data
// create the column with the prediction
.withColumn("prediction", colMod(preds.map(p => data(p))))
//.withColumn("prediction", colMod(for (pred <- preds) yield data(pred)))
//.withColumn("prediction", colMod(preds.foreach(data(_))))
// prev line should = colMod(data(pred1), data(pred2), ..., data(predn))
}
这是如何正确完成的方式:
import org.apache.spark.sql.functions.{lit, col}
import org.apache.spark.sql.Column
def predict(coefficients: Seq[Double], predictors: Seq[String], df: DataFrame) = {
// I assume there is no predictor for bias
// but you can easily correct for that
val prediction: Column = predictors.zip(coefficients).map {
case (p, c) => col(p) * lit(c)
}.foldLeft(col("bias"))(_ + _)
df.withColumn("prediction", prediction)
}
示例用法:
val df = Seq((1.0, -1.0, 3.0, 5.0)).toDF("bias", "x1", "x2", "x3")
predict(Seq(2.0, 3.0), Seq("x1", "x3"), df)
结果是:
+----+----+---+---+----------+
|bias| x1| x2| x3|prediction|
+----+----+---+---+----------+
| 1.0|-1.0|3.0|5.0| 14.0|
+----+----+---+---+----------+
关于您的代码,您犯了许多错误:
-
Array[_]
不是ArrayType
列的有效外部类型。有效的外部表示为Seq[_]
,因此您传递给udf
的功能的参数应为Seq[Double]
。 - 传递给
udf
的功能不能为Unit
。在您的情况下,应该是Double
。结合上几点,有效的签名将是(Seq[Double], Seq[Double]) => Double
。 colMod
期望Column
类型的单一参数。import org.apache.spark.sql.functions.array colMod(array(preds.map(col): _*))
您的代码不是
NULL
/null
安全。