如何使用 JAVA 在 Spark 数据帧上调用 UDF



与这里类似的问题,但没有足够的点在那里发表评论。

根据最新的Spark文档,udf可以以两种不同的方式使用,一种用于SQL,另一种用于DataFrame。我找到了多个如何将udf与 sql 一起使用的示例,但无法找到有关如何直接在数据帧上使用udf的示例。

o.p. 针对上述链接的问题提供的解决方案使用 __callUDF()__,该_deprecated_,将根据 Spark Java API 文档在 Spark 2.0 中删除。在那里,它说:

"因为它与 udf() 是多余的"

所以这意味着我应该能够使用__udf()__来计算我的udf,但我不知道该怎么做。我没有偶然发现任何阐明Java-Spark程序语法的东西。我错过了什么?

import org.apache.spark.sql.api.java.UDF1;
.
.    
UDF1 mode = new UDF1<String[], String>() {
    public String call(final String[] types) throws Exception {
        return types[0];
    }
};
sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?

Spark>= 2.3

可以直接调用 Scala 风格的udf

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
UserDefinedFunction mode = udf(
  (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);
df.select(mode.apply(col("vs"))).show();

火花<2.3

即使我们假设您的 UDF 很有用并且不能被简单的 getItem 调用替换,它的签名不正确。数组列是使用 Scala WrappedArray而不是普通 Java 数组公开的,因此您必须调整签名:

UDF1 mode = new UDF1<Seq<String>, String>() {
  public String call(final Seq<String> types) throws Exception {
    return types.headOption();
  }
};

如果 UDF 已注册:

sqlContext.udf().register("mode", mode, DataTypes.StringType);

你可以简单地使用 callUDF(这是 1.5 中引入的一个新函数)按名称调用它:

df.select(callUDF("mode", col("vs"))).show();

您也可以在selectExprs中使用它:

df.selectExpr("mode(vs)").show();

相关内容

  • 没有找到相关文章

最新更新