在 Spark Java 中将超过 22 列传递给 UDF



我有一个用我的Spark Java代码编写的UDF,我想在其中传递超过22列(正好是24列)。但是 Spark API 最多只允许 22 列,有什么技巧可以覆盖它,或者我可以创建自定义 UDF 函数来覆盖这个限制吗?

您可以传递复杂类型的列。最通用的解决方案是结构,但您也可以考虑数组或映射。

地图示例中的参数:

    val df = sc.parallelize(Seq(("a","b"),("c","d"), 
      ("e","f"))).toDF("one","two")

     val myUDF = udf((input:Map[String,String]) => {
      // do something with the input
       input("one")=="a"
       })
     df
    .withColumn("udf_args",map(
       lit("one"),$"one",
        lit("two"),$"one"
      )
    )
    .withColumn("udf_result", myUDF($"udf_args"))
     .show()

您可以将列值数组传递给 udf,而不是传递 24 个列值,操作将在数组上进行。下面是示例代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
case class department(id: Integer, deptname: String)
import spark.implicits._
val df1 = Seq(department(1, "physics")
      , department(2, "computer")).toDF()
val df2 = df1.withColumn("all_col", array($"id", $"deptname"))
val concat_udf:UserDefinedFunction = udf((all_col_values:Seq[String]) => {
      (all_col_values(0) + "-" + all_col_values(1))
    })
//apply udf
val df3 = df2.withColumn("all_col_concat",concat_udf(col("all_col")))
df3.show()

额外:如果可以在没有 udf 的情况下对每一行应用匿名函数,您可以尝试这种方式,但不确定它是否满足要求。

import org.apache.spark.sql.Row
val df4 = df1.rdd.map{ case Row(id:Integer, deptname:String) => (id, deptname,id.toString()+"-"+deptname)}.
          toDF("id","deptname", "all_col_concat")
df4.show()

我看到很多答案都是用scala写的,正如你在Spark java中问的那样,我将用Java重写它。答案也可以在任意数量的列中使用。

import static org.apache.spark.sql.functions.array;
List<Column> cols =  Arrays.asList(new Column[] {ds.select("col1"), ds.select("col2") ...});// all the columns
Column mergedCol = array(cols.toArray(new Column[cols.size()])); //merge all your cols
//udf
UserDefinedFunction myUdf = udf(
    (Seq<Object> seq) -> {
        //you should have 24 Objects here. 
        for (Object o : JavaConverters.seqAsJavaListConverter(seq).asJava()) {                  
                ...         
        );
    },
    DataTypes.[your data type]);
//use it as
ds.select(myUdf.apply(mergedCol));

相关内容

  • 没有找到相关文章

最新更新