与这里类似的问题,但没有足够的点在那里发表评论。
根据最新的Spark文档,udf
可以以两种不同的方式使用,一种用于SQL,另一种用于DataFrame。我找到了多个如何将udf
与 sql 一起使用的示例,但无法找到有关如何直接在数据帧上使用udf
的示例。
o.p. 针对上述链接的问题提供的解决方案使用 __callUDF()__
,该_deprecated_
,将根据 Spark Java API 文档在 Spark 2.0 中删除。在那里,它说:
"因为它与 udf() 是多余的"
所以这意味着我应该能够使用__udf()__
来计算我的udf
,但我不知道该怎么做。我没有偶然发现任何阐明Java-Spark程序语法的东西。我错过了什么?
import org.apache.spark.sql.api.java.UDF1;
.
.
UDF1 mode = new UDF1<String[], String>() {
public String call(final String[] types) throws Exception {
return types[0];
}
};
sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?
Spark>= 2.3
可以直接调用 Scala 风格的udf
:
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
UserDefinedFunction mode = udf(
(Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);
df.select(mode.apply(col("vs"))).show();
火花<2.3
即使我们假设您的 UDF 很有用并且不能被简单的 getItem
调用替换,它的签名不正确。数组列是使用 Scala WrappedArray
而不是普通 Java 数组公开的,因此您必须调整签名:
UDF1 mode = new UDF1<Seq<String>, String>() {
public String call(final Seq<String> types) throws Exception {
return types.headOption();
}
};
如果 UDF 已注册:
sqlContext.udf().register("mode", mode, DataTypes.StringType);
你可以简单地使用 callUDF
(这是 1.5 中引入的一个新函数)按名称调用它:
df.select(callUDF("mode", col("vs"))).show();
您也可以在selectExprs
中使用它:
df.selectExpr("mode(vs)").show();