将列对象缩放为单列数据帧



>我有一个这样的数据帧:

val df = Seq(
("a", Seq(2.0)),
("a", Seq(1.0)),
("a", Seq(0.5)),
("b", Seq(24.0)),
("b", Seq(12.5)),
("b", Seq(6.4)),
("b", Seq(3.2)),
("c", Seq(104.0)),
("c", Seq(107.4))
).toDF("key", "value")

我需要使用一种算法,该算法在不同组上输入数据帧对象。 为了更清楚地说明这一点,假设我必须使用按组缩放的标准缩放器。

在熊猫中,我会做这样的事情(过程中的许多类型更改(:

from sklearn.preprocessing import StandardScaler
df.groupby(key) 
.value 
.transform(lambda x: StandardScaler 
.fit_transform(x 
.values 
.reshape(-1,1)) 
.reshape(-1))

我需要在 scala 中执行此操作,因为我需要使用的算法不是 Scaler,而是 scala 中内置的另一件事。

到目前为止,我已经尝试做这样的事情:

import org.apache.spark.ml.feature.StandardScaler
def f(X : org.apache.spark.sql.Column) : org.apache.spark.sql.Column = {  
val scaler = new StandardScaler()
.setInputCol("value")
.setOutputCol("scaled")
val output = scaler.fit(X)("scaled")
(output)
}
df.withColumn("scaled_values", f(col("features")).over(Window.partitionBy("key")))

但当然,它给了我一个错误:

command-144174313464261:21:错误:类型不匹配; 找到 : org.apache.spark.sql.Column 必需: org.apache.spark.sql.Dataset[_] val output = scaler.fit(X(("scaled"(

所以我正在尝试将单个列对象转换为数据帧对象,但没有成功。我该怎么做?

如果不可能,是否有任何解决方法可以解决此问题?

更新 1

似乎我在代码中犯了一些错误,我试图修复它(我认为我做对了(:

val df = Seq(
("a", 2.0),
("a", 1.0),
("a", 0.5),
("b", 24.0),
("b", 12.5),
("b", 6.4),
("b", 3.2),
("c", 104.0),
("c", 107.4)
).toDF("key", "value")

def f(X : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.Column = {  
val assembler = new VectorAssembler()
.setInputCols(Array("value"))
.setOutputCol("feature")
val scaler = new StandardScaler()
.setInputCol("feature")
.setOutputCol("scaled")
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler))
val output = pipeline.fit(X).transform(X)("scaled")
(output)
}  
someDF.withColumn("scaled_values", f(someDF).over(Window.partitionBy("key")))

我仍然收到错误:

org.apache.spark.sql.AnalysisException: 表达式 'scaled#1294' not 在窗口函数中受支持。

我不确定此错误的原因,我尝试为列添加别名,但它似乎不起作用。

所以我试图将单个列对象转换为数据帧对象,但没有成功。我该怎么做?

你不能,column只引用数据帧的column,它不包含任何数据,它不是像数据帧那样的数据结构。

您的f函数也不会像这样工作。如果你想创建一个用于Window的自定义函数,那么你需要一个UDAF(用户定义的聚合函数(,这是非常困难的......

在您的情况下,我会对一个 groupBykey,collect_list您的值,然后应用 UDF 进行缩放。请注意,这仅适用于每个键的数据不会太大(大于适合 1 个执行器的数据(,否则您需要 UDAF

这里有一个例子:

// example scala method, scale to 0-1
def myScaler(data:Seq[Double]) = {
val mi = data.min
val ma = data.max
data.map(x => (x-mi)/(ma-mi))
}
val udf_myScaler = udf(myScaler _)
df
.groupBy($"key")
.agg(
collect_list($"value").as("values")
)
.select($"key",explode(arrays_zip($"values",udf_myScaler($"values"))))
.select($"key",$"col.values",$"col.1".as("values_scaled"))
.show()

给:

+---+------+-------------------+
|key|values|      values_scaled|
+---+------+-------------------+
|  c| 104.0|                0.0|
|  c| 107.4|                1.0|
|  b|  24.0|                1.0|
|  b|  12.5|0.44711538461538464|
|  b|   6.4|0.15384615384615385|
|  b|   3.2|                0.0|
|  a|   2.0|                1.0|
|  a|   1.0| 0.3333333333333333|
|  a|   0.5|                0.0|
+---+------+-------------------+

最新更新