>Acyally正在研究Spark 2.0.2我想知道,例如,要处理基于 Spark ML 的逻辑回归,我想将数据帧的每一行放入一个向量中,该向量将被输入用于逻辑回归,您能否帮助获取数据帧中的行结果以将每一行放入密集向量。谢谢,这是我为获取数据帧所做的工作。
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.hadoop.fs.shell.Display
object Example extends App {
val sparkSession = SparkSession.builder.master("local").appName("my-spark-app").getOrCreate()
val data=sparkSession.read.option("header", "true").csv("C://sample_lda_data.csv").toDF()
val data2=data.select("col2","col3","col4","col5","col6","col7","col8","col9")
最后,我想得到这样的东西作为逻辑回归的输入在第一个位置,它将是数据帧的第一列,请提供任何帮助
val data=sparkSession.read.option("header", "true").csv("C://sample_lda_data.csv").toDF()
val data2=data.select("col2","col3","col4","col5","col6","col7","col8","col9")
val assembler = new VectorAssembler().setInputCols(Array("col2", "col3", "col4")).setOutputCol("features")
val output = assembler.transform(data2)
main" java.lang.IllegalArgumentException: Data type StringType is not supported.
我会很感激的。谢谢你们
您可以使用
array
函数,然后映射到LabeledPoint
s:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
// create an array column from all but first one:
val arrayCol: Column = array(df.columns.drop(1).map(col).map(_.cast(DoubleType)): _*)
// select array column and first column, and map into LabeledPoints
val result: Dataset[LabeledPoint] = df.select(col("col1").cast(DoubleType), arrayCol)
.map(r => LabeledPoint(
r.getAs[Double](0),
Vectors.dense(r.getAs[mutable.WrappedArray[Double]](1).toArray)
))
// You can use the Dataset or the RDD
result.show()
// +-----+---------------------+
// |label|features |
// +-----+---------------------+
// |1.0 |[2.0,3.0,4.0,0.5] |
// |11.0 |[12.0,13.0,14.0,15.0]|
// |21.0 |[22.0,23.0,24.0,25.0]|
// +-----+---------------------+
result.rdd.foreach(println)
// (1.0,[2.0,3.0,4.0,0.5])
// (21.0,[22.0,23.0,24.0,25.0])
I have wrote code to convert dataframe's numeric columns into dense vector. Please find below code. Note: here col1 and col2 are numeric type columns.
import sparksession.implicits._;
val result: Dataset[LabeledPoint] = df.map{ x => LabeledPoint(x.getAs[Integer]("Col1").toDouble, Vectors.dense(x.getAs[Double]("col2"))) }
result.show();
result.printSchema();
+-------+----------+
| label| features|
+-------+----------+
|31825.0| [75000.0]|
|58784.0| [24044.0]|
| 121.0| [41000.0]|
root
|-- label: double (nullable = true)
|-- features: vector (nullable = true)