如何在Spark Shell中注册Java Spark UDF



以下是我的java udf代码,

package com.udf;
import org.apache.spark.sql.api.java.UDF1;
public class SparkUDF implements UDF1<String, String> {
    @Override
    public String call(String arg) throws Exception {
        if (validateString(arg))
            return arg;
        return "INVALID";
    }
public static boolean validateString(String arg) {
    if (arg == null | arg.length() != 11)
        return false;
    else
        return true;
}
}

我正在用SparkUdf-1.0-SNAPSHOT.jar

建造罐子

我在Hive中有一个表名,并想在Spark Shell上的SQL下运行。

> select UDF(name) from sample ;

使用下面的命令开始火花壳。

Spark-shell - Jars Sparkudf-1.0-Snapshot.jar

任何人都可以说,如何在Spark Shell上注册UDF以在Spark SQL中使用它?

在进行更多搜索之后,我得到了答案,

以下是步骤,

spark-shell --jars SparkUdf-1.0-SNAPSHOT.jar
scala> import com.udf.SparkUDF;
scala> import com.udf.SparkUDF;
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
scala> spark.udf.register("myfunc", new SparkUDF(),StringType)
scala> val sql1 = """ select myfunc(name) from sample """
scala> spark.sql(sql1).show();

您将获得结果。

如果您想从jupyter笔记本和s3上的udf jar测试UDF:

步骤1:将UDF Jar加载到Jupyter笔记本:

%%configure -f 
{ 
    "conf": { 
        "spark.jars": "s3://s3-path/your-udf.jar" 
    } 
} 

步骤2:在PySpark中注册基于Scala的UDF

spark.udf.registerJavaFunction("myudf", "<udf.package>.<UDFClass>") 

步骤3:从Spark SQL

调用UDF
df = spark.read.parquet("s3://s3-path-to-test-data/ts_date=2021-04-27") 
df.createOrReplaceTempView('stable') 
spark.sql("select *, myudf(arg1,arg2) as result from stable ").show(5,False) 

最新更新