我刚刚使用了Standard Scaler来规范化我的ML应用程序的特性。在选择缩放的特征之后,我想将其转换回double的数据帧,尽管我的向量的长度是任意的。我知道如何通过使用
为特定的3个功能做到这一点myDF.map{case Row(v: Vector) => (v(0), v(1), v(2))}.toDF("f1", "f2", "f3")
而不是任意数量的特征。有简单的方法吗?
的例子:
val testDF = sc.parallelize(List(Vectors.dense(5D, 6D, 7D), Vectors.dense(8D, 9D, 10D), Vectors.dense(11D, 12D, 13D))).map(Tuple1(_)).toDF("scaledFeatures")
val myColumnNames = List("f1", "f2", "f3")
// val finalDF = DataFrame[f1: Double, f2: Double, f3: Double]
编辑我发现了如何在创建数据框时解包到列名,但仍然有麻烦将向量转换为创建数据框所需的序列:
finalDF = testDF.map{case Row(v: Vector) => v.toArray.toSeq /* <= this errors */}.toDF(List("f1", "f2", "f3"): _*)
> =火花3.0.0
从Spark 3.0开始,你可以使用vector_to_array
import org.apache.spark.ml.functions.vector_to_array
testDF.select(vector_to_array($"scaledFeatures").alias("_tmp")).select(exprs:_*)
火花& lt;3.0.0
一种可能的方法类似于
import org.apache.spark.sql.functions.udf
// In Spark 1.x you'll will have to replace ML Vector with MLLib one
// import org.apache.spark.mllib.linalg.Vector
// In 2.x the below is usually the right choice
import org.apache.spark.ml.linalg.Vector
// Get size of the vector
val n = testDF.first.getAs[Vector](0).size
// Simple helper to convert vector to array<double>
// asNondeterministic is available in Spark 2.3 or befor
// It can be removed, but at the cost of decreased performance
val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic
// Prepare a list of columns to create
val exprs = (0 until n).map(i => $"_tmp".getItem(i).alias(s"f$i"))
testDF.select(vecToSeq($"scaledFeatures").alias("_tmp")).select(exprs:_*)
如果你知道前面的列列表,你可以简化一下:
val cols: Seq[String] = ???
val exprs = cols.zipWithIndex.map{ case (c, i) => $"_tmp".getItem(i).alias(c) }
关于Python等效内容,请参见如何将Vector拆分为列-使用PySpark。
请尝试VectorSlicer:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq((1, 0.2, 0.8), (2, 0.1, 0.9), (3, 0.3, 0.7))
).toDF("id", "negative_logit", "positive_logit")
val assembler = new VectorAssembler()
.setInputCols(Array("negative_logit", "positive_logit"))
.setOutputCol("prediction")
val output = assembler.transform(dataset)
output.show()
/*
+---+--------------+--------------+----------+
| id|negative_logit|positive_logit|prediction|
+---+--------------+--------------+----------+
| 1| 0.2| 0.8| [0.2,0.8]|
| 2| 0.1| 0.9| [0.1,0.9]|
| 3| 0.3| 0.7| [0.3,0.7]|
+---+--------------+--------------+----------+
*/
val slicer = new VectorSlicer()
.setInputCol("prediction")
.setIndices(Array(1))
.setOutputCol("positive_prediction")
val posi_output = slicer.transform(output)
posi_output.show()
/*
+---+--------------+--------------+----------+-------------------+
| id|negative_logit|positive_logit|prediction|positive_prediction|
+---+--------------+--------------+----------+-------------------+
| 1| 0.2| 0.8| [0.2,0.8]| [0.8]|
| 2| 0.1| 0.9| [0.1,0.9]| [0.9]|
| 3| 0.3| 0.7| [0.3,0.7]| [0.7]|
+---+--------------+--------------+----------+-------------------+
*/
几天前出现的替代解决方案:将VectorDisassembler
导入到您的项目中(只要它没有合并到Spark中),现在:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq((0, 1.2, 1.3), (1, 2.2, 2.3), (2, 3.2, 3.3))
).toDF("id", "val1", "val2")
val assembler = new VectorAssembler()
.setInputCols(Array("val1", "val2"))
.setOutputCol("vectorCol")
val output = assembler.transform(dataset)
output.show()
/*
+---+----+----+---------+
| id|val1|val2|vectorCol|
+---+----+----+---------+
| 0| 1.2| 1.3|[1.2,1.3]|
| 1| 2.2| 2.3|[2.2,2.3]|
| 2| 3.2| 3.3|[3.2,3.3]|
+---+----+----+---------+*/
val disassembler = new org.apache.spark.ml.feature.VectorDisassembler()
.setInputCol("vectorCol")
disassembler.transform(output).show()
/*
+---+----+----+---------+----+----+
| id|val1|val2|vectorCol|val1|val2|
+---+----+----+---------+----+----+
| 0| 1.2| 1.3|[1.2,1.3]| 1.2| 1.3|
| 1| 2.2| 2.3|[2.2,2.3]| 2.2| 2.3|
| 2| 3.2| 3.3|[3.2,3.3]| 3.2| 3.3|
+---+----+----+---------+----+----+*/
我使用Spark 2.3.2,并构建了一个xgboost4j二进制分类模型,结果如下所示:
results_train.select("classIndex","probability","prediction").show(3,0)
+----------+----------------------------------------+----------+
|classIndex|probability |prediction|
+----------+----------------------------------------+----------+
|1 |[0.5998525619506836,0.400147408246994] |0.0 |
|1 |[0.5487841367721558,0.45121586322784424]|0.0 |
|0 |[0.5555324554443359,0.44446757435798645]|0.0 |
我定义了下面的udf来获取向量列概率
的元素import org.apache.spark.sql.functions._
def getProb = udf((probV: org.apache.spark.ml.linalg.Vector, clsInx: Int) => probV.apply(clsInx) )
results_train.select("classIndex","probability","prediction").
withColumn("p_0",getProb($"probability",lit(0))).
withColumn("p_1",getProb($"probability", lit(1))).show(3,0)
+----------+----------------------------------------+----------+------------------+-------------------+
|classIndex|probability |prediction|p_0 |p_1 |
+----------+----------------------------------------+----------+------------------+-------------------+
|1 |[0.5998525619506836,0.400147408246994] |0.0 |0.5998525619506836|0.400147408246994 |
|1 |[0.5487841367721558,0.45121586322784424]|0.0 |0.5487841367721558|0.45121586322784424|
|0 |[0.5555324554443359,0.44446757435798645]|0.0 |0.5555324554443359|0.44446757435798645|
希望这对那些处理矢量类型输入的人有帮助。
由于上述答案需要额外的库或仍然不支持,我使用pandas dataframe轻松提取向量值,然后将其转换回spark dataframe
# convert to pandas dataframe
pandasDf = dataframe.toPandas()
# add a new column
pandasDf['newColumnName'] = 0 # filled the new column with 0s
# now iterate through the rows and update the column
for index, row in pandasDf.iterrows():
value = row['vectorCol'][0] # get the 0th value of the vector
pandasDf.loc[index, 'newColumnName'] = value # put the value in the new column