标量中的MinMax归一化



我有一个包含多列的org.apache.spark.sql.DataFrame。我想使用MinMax Normalization或任何技术缩放1列(lat_long_dist),以在-1和1之间缩放数据,并将数据类型保留为org.apache.spark.sql.DataFrame

scala> val df = sqlContext.csvFile("tenop.csv")
df: org.apache.spark.sql.DataFrame = [gst_id_matched: string,
  ip_crowding: string, lat_long_dist: double, stream_name_1: string]

我找到了StandardScaler选项,但这需要在进行转换之前转换数据集。有简单干净的方法吗。

当您已经在玩Spark时,这里有另一个建议。

为什么不使用毫升包装的MinMaxScaler?

让我们用zero323中的同一个例子来尝试一下。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.functions.udf
val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")
//val df.map(r => Vectors.dense(Array(r.getAs[Double]("v"))))
val vectorizeCol = udf( (v:Double) => Vectors.dense(Array(v)) )
val df2 = df.withColumn("vVec", vectorizeCol(df("v"))
val scaler = new MinMaxScaler()
    .setInputCol("vVec")
    .setOutputCol("vScaled")
    .setMax(1)
    .setMin(-1)
scaler.fit(df2).transform(df2).show
+---+-----+-------+--------------------+
|  k|    v|   vVec|             vScaled|
+---+-----+-------+--------------------+
|  1|  0.5|  [0.5]|[-0.3093093093093...|
|  2| 10.2| [10.2]|[0.27327327327327...|
|  3|  5.7|  [5.7]|[0.00300300300300...|
|  4|-11.0|[-11.0]|              [-1.0]|
|  5| 22.3| [22.3]|               [1.0]|
+---+-----+-------+--------------------+

利用同时缩放多个列的优势。

val df = sc.parallelize(Seq(
    (1.0, -1.0, 2.0),
    (2.0, 0.0, 0.0),
    (0.0, 1.0, -1.0)
)).toDF("a", "b", "c")
import org.apache.spark.ml.feature.VectorAssembler
val assembler = new VectorAssembler()
    .setInputCols(Array("a", "b", "c"))
    .setOutputCol("features")
val df2 = assembler.transform(df)
// Reusing the scaler instance above with the same min(-1) and max(1) 
scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show
+---+----+----+--------------+--------------------+
|  a|   b|   c|      features|      scaledFeatures|
+---+----+----+--------------+--------------------+
|1.0|-1.0| 2.0|[1.0,-1.0,2.0]|      [0.0,-1.0,1.0]|
|2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...|
|0.0| 1.0|-1.0|[0.0,1.0,-1.0]|     [-1.0,1.0,-1.0]|
+---+----+----+--------------+--------------------+

我想你想要的是像这样的

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{min, max, lit}
val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")
val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match {
  case Row(x: Double, y: Double) => (x, y)
}
val scaledRange = lit(2) // Range of the scaled variable
val scaledMin = lit(-1)  // Min value of the scaled variable
val vNormalized = ($"v" - vMin) / (vMax - vMin) // v normalized to (0, 1) range
val vScaled = scaledRange * vNormalized + scaledMin
df.withColumn("vScaled", vScaled).show
// +---+-----+--------------------+
// |  k|    v|             vScaled|
// +---+-----+--------------------+
// |  1|  0.5| -0.3093093093093092|
// |  2| 10.2| 0.27327327327327344|
// |  3|  5.7|0.003003003003003...|
// |  4|-11.0|                -1.0|
// |  5| 22.3|                 1.0|
// +---+-----+--------------------+

还有另一种解决方案。从Matt、Lyle和zero323处获取代码,谢谢!

import org.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler}
val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")
val assembler = new VectorAssembler().setInputCols(Array("v")).setOutputCol("vVec")
val df2= assembler.transform(df)
val scaler = new MinMaxScaler().setInputCol("vVec").setOutputCol("vScaled").setMax(1).setMin(-1)
scaler.fit(df2).transform(df2).show

结果:

+---+-----+-------+--------------------+
|  k|    v|   vVec|             vScaled|
+---+-----+-------+--------------------+
|  1|  0.5|  [0.5]|[-0.3093093093093...|
|  2| 10.2| [10.2]|[0.27327327327327...|
|  3|  5.7|  [5.7]|[0.00300300300300...|
|  4|-11.0|[-11.0]|              [-1.0]|
|  5| 22.3| [22.3]|               [1.0]|
+---+-----+-------+--------------------+

顺便说一句:其他解决方案在我这边产生错误

java.lang.IllegalArgumentException: requirement failed: Column vVec must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:43)
  at org.apache.spark.ml.feature.MinMaxScalerParams$class.validateAndTransformSchema(MinMaxScaler.scala:67)
  at org.apache.spark.ml.feature.MinMaxScaler.validateAndTransformSchema(MinMaxScaler.scala:93)
  at org.apache.spark.ml.feature.MinMaxScaler.transformSchema(MinMaxScaler.scala:129)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.feature.MinMaxScaler.fit(MinMaxScaler.scala:119)
  ... 50 elided

非常感谢!

相关内容

  • 没有找到相关文章

最新更新