>我有一个这样的数据帧:
val df = Seq(
("a", Seq(2.0)),
("a", Seq(1.0)),
("a", Seq(0.5)),
("b", Seq(24.0)),
("b", Seq(12.5)),
("b", Seq(6.4)),
("b", Seq(3.2)),
("c", Seq(104.0)),
("c", Seq(107.4))
).toDF("key", "value")
我需要使用一种算法,该算法在不同组上输入数据帧对象。 为了更清楚地说明这一点,假设我必须使用按组缩放的标准缩放器。
在熊猫中,我会做这样的事情(过程中的许多类型更改(:
from sklearn.preprocessing import StandardScaler
df.groupby(key)
.value
.transform(lambda x: StandardScaler
.fit_transform(x
.values
.reshape(-1,1))
.reshape(-1))
我需要在 scala 中执行此操作,因为我需要使用的算法不是 Scaler,而是 scala 中内置的另一件事。
到目前为止,我已经尝试做这样的事情:
import org.apache.spark.ml.feature.StandardScaler
def f(X : org.apache.spark.sql.Column) : org.apache.spark.sql.Column = {
val scaler = new StandardScaler()
.setInputCol("value")
.setOutputCol("scaled")
val output = scaler.fit(X)("scaled")
(output)
}
df.withColumn("scaled_values", f(col("features")).over(Window.partitionBy("key")))
但当然,它给了我一个错误:
command-144174313464261:21:错误:类型不匹配; 找到 : org.apache.spark.sql.Column 必需: org.apache.spark.sql.Dataset[_] val output = scaler.fit(X(("scaled"(
所以我正在尝试将单个列对象转换为数据帧对象,但没有成功。我该怎么做?
如果不可能,是否有任何解决方法可以解决此问题?
更新 1
似乎我在代码中犯了一些错误,我试图修复它(我认为我做对了(:
val df = Seq(
("a", 2.0),
("a", 1.0),
("a", 0.5),
("b", 24.0),
("b", 12.5),
("b", 6.4),
("b", 3.2),
("c", 104.0),
("c", 107.4)
).toDF("key", "value")
def f(X : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.Column = {
val assembler = new VectorAssembler()
.setInputCols(Array("value"))
.setOutputCol("feature")
val scaler = new StandardScaler()
.setInputCol("feature")
.setOutputCol("scaled")
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler))
val output = pipeline.fit(X).transform(X)("scaled")
(output)
}
someDF.withColumn("scaled_values", f(someDF).over(Window.partitionBy("key")))
我仍然收到错误:
org.apache.spark.sql.AnalysisException: 表达式 'scaled#1294' not 在窗口函数中受支持。
我不确定此错误的原因,我尝试为列添加别名,但它似乎不起作用。
所以我试图将单个列对象转换为数据帧对象,但没有成功。我该怎么做?
你不能,column
只引用数据帧的column
,它不包含任何数据,它不是像数据帧那样的数据结构。
您的f
函数也不会像这样工作。如果你想创建一个用于Window
的自定义函数,那么你需要一个UDAF(用户定义的聚合函数(,这是非常困难的......
在您的情况下,我会对一个 groupBykey
,collect_list您的值,然后应用 UDF 进行缩放。请注意,这仅适用于每个键的数据不会太大(大于适合 1 个执行器的数据(,否则您需要 UDAF
这里有一个例子:
// example scala method, scale to 0-1
def myScaler(data:Seq[Double]) = {
val mi = data.min
val ma = data.max
data.map(x => (x-mi)/(ma-mi))
}
val udf_myScaler = udf(myScaler _)
df
.groupBy($"key")
.agg(
collect_list($"value").as("values")
)
.select($"key",explode(arrays_zip($"values",udf_myScaler($"values"))))
.select($"key",$"col.values",$"col.1".as("values_scaled"))
.show()
给:
+---+------+-------------------+
|key|values| values_scaled|
+---+------+-------------------+
| c| 104.0| 0.0|
| c| 107.4| 1.0|
| b| 24.0| 1.0|
| b| 12.5|0.44711538461538464|
| b| 6.4|0.15384615384615385|
| b| 3.2| 0.0|
| a| 2.0| 1.0|
| a| 1.0| 0.3333333333333333|
| a| 0.5| 0.0|
+---+------+-------------------+