我有一个用我的Spark Java代码编写的UDF,我想在其中传递超过22列(正好是24列)。但是 Spark API 最多只允许 22 列,有什么技巧可以覆盖它,或者我可以创建自定义 UDF 函数来覆盖这个限制吗?
您可以传递复杂类型的列。最通用的解决方案是结构,但您也可以考虑数组或映射。
地图示例中的参数:
val df = sc.parallelize(Seq(("a","b"),("c","d"),
("e","f"))).toDF("one","two")
val myUDF = udf((input:Map[String,String]) => {
// do something with the input
input("one")=="a"
})
df
.withColumn("udf_args",map(
lit("one"),$"one",
lit("two"),$"one"
)
)
.withColumn("udf_result", myUDF($"udf_args"))
.show()
您可以将列值数组传递给 udf,而不是传递 24 个列值,操作将在数组上进行。下面是示例代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
case class department(id: Integer, deptname: String)
import spark.implicits._
val df1 = Seq(department(1, "physics")
, department(2, "computer")).toDF()
val df2 = df1.withColumn("all_col", array($"id", $"deptname"))
val concat_udf:UserDefinedFunction = udf((all_col_values:Seq[String]) => {
(all_col_values(0) + "-" + all_col_values(1))
})
//apply udf
val df3 = df2.withColumn("all_col_concat",concat_udf(col("all_col")))
df3.show()
额外:如果可以在没有 udf 的情况下对每一行应用匿名函数,您可以尝试这种方式,但不确定它是否满足要求。
import org.apache.spark.sql.Row
val df4 = df1.rdd.map{ case Row(id:Integer, deptname:String) => (id, deptname,id.toString()+"-"+deptname)}.
toDF("id","deptname", "all_col_concat")
df4.show()
我看到很多答案都是用scala写的,正如你在Spark java中问的那样,我将用Java重写它。答案也可以在任意数量的列中使用。
import static org.apache.spark.sql.functions.array;
List<Column> cols = Arrays.asList(new Column[] {ds.select("col1"), ds.select("col2") ...});// all the columns
Column mergedCol = array(cols.toArray(new Column[cols.size()])); //merge all your cols
//udf
UserDefinedFunction myUdf = udf(
(Seq<Object> seq) -> {
//you should have 24 Objects here.
for (Object o : JavaConverters.seqAsJavaListConverter(seq).asJava()) {
...
);
},
DataTypes.[your data type]);
//use it as
ds.select(myUdf.apply(mergedCol));