使用 Java 在 Spark DataFrame 中将数组转换为 DenseVector



我正在运行Spark 2.3。我想将以下数据帧中的列featuresArrayType转换为DenseVector。我在Java中使用Spark。

+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[4.191401, -1.793...|
| 10|[-0.5674514, -1.3...|
| 20|[0.735613, -0.026...|
| 30|[-0.030161237, 0....|
| 40|[-0.038345724, -0...|
+---+--------------------+
root
|-- id: integer (nullable = false)
|-- features: array (nullable = true)
|    |-- element: float (containsNull = false)

我写了以下UDF但它似乎不起作用:

private static UDF1 toVector = new UDF1<Float[], Vector>() {
private static final long serialVersionUID = 1L;
@Override
public Vector call(Float[] t1) throws Exception {
double[] DoubleArray = new double[t1.length];
for (int i = 0 ; i < t1.length; i++)
{
DoubleArray[i] = (double) t1[i];
}   
Vector vector = (org.apache.spark.mllib.linalg.Vector) Vectors.dense(DoubleArray);
return vector;
}
}

我希望提取以下特征作为向量,以便对其执行聚类。

我也在注册UDF,然后继续调用它,如下所示:

spark.udf().register("toVector", (UserDefinedAggregateFunction) toVector);
df3 = df3.withColumn("featuresnew", callUDF("toVector", df3.col("feautres")));
df3.show();  

在运行此代码段时,我面临以下错误:

ReadProcessData$1 不能转换为 org.apache.spark.sql.expressions。用户定义聚合函数

问题在于如何在 Spark 中注册udf。不应使用不是udf而是用于聚合的udafUserDefinedAggregateFunction。相反,您应该做的是:

spark.udf().register("toVector", toVector, new VectorUDT());

然后,要使用注册的函数,请使用:

df3.withColumn("featuresnew", callUDF("toVector",df3.col("feautres")));

udf本身应略作调整,如下所示:

UDF1 toVector = new UDF1<Seq<Float>, Vector>(){
public Vector call(Seq<Float> t1) throws Exception {
List<Float> L = scala.collection.JavaConversions.seqAsJavaList(t1);
double[] DoubleArray = new double[t1.length()]; 
for (int i = 0 ; i < L.size(); i++) { 
DoubleArray[i]=L.get(i); 
} 
return Vectors.dense(DoubleArray); 
} 
};

请注意,在Spark 2.3+中,您可以创建可以直接调用的 scala 样式udf。从这个答案:

UserDefinedFunction toVector = udf(
(Seq<Float> array) -> /* udf code or method to call */, new VectorUDT()
);
df3.withColumn("featuresnew", toVector.apply(col("feautres")));

最新更新