Spark Build自定义列函数,用户定义函数



我正在使用Scala,并希望构建自己的DataFrame函数。例如,我想将列视为数组,遍历每个元素并进行计算。

首先,我尝试实现我自己的getMax方法。因此,列x的值为[3,8,2,5,9],该方法的预期输出为9。

以下是它在Scala 中的样子

def getMax(inputArray: Array[Int]): Int = {
   var maxValue = inputArray(0)
   for (i <- 1 until inputArray.length if inputArray(i) > maxValue) {
     maxValue = inputArray(i)
   }
   maxValue
}

这就是我到目前为止所拥有的,并得到这个错误

"value length is not a member of org.apache.spark.sql.column", 

而且我不知道该如何遍历该列。

def getMax(col: Column): Column = {
var maxValue = col(0)
for (i <- 1 until col.length if col(i) > maxValue){
    maxValue = col(i)
}
maxValue

}

一旦我能够实现我自己的方法,我将创建一个列函数

val value_max:org.apache.spark.sql.Column=getMax(df.col(“value”)).as(“value_max”)

然后我希望能够在SQL语句中使用它,例如

val sample = sqlContext.sql("SELECT value_max(x) FROM table")

给定输入列[3,8,2,9],预期输出为9

我正在关注另一个线程Spark Scala的答案——我如何迭代数据帧中的行,并将计算值添加为数据帧的新列,在那里它们创建了一个标准偏差的私有方法。我将要做的计算将比这更复杂,(例如,我将比较列中的每个元素),我是朝着正确的方向前进,还是应该更多地研究用户定义函数?

在Spark DataFrame中,由于Column不是可迭代对象,因此无法使用您所想到的方法迭代Column的元素。

然而,要处理列的值,您有一些选项,正确的选项取决于您的任务:

1)使用现有的内置功能

Spark SQL已经有很多用于处理列的有用函数,包括聚合和转换函数。其中大部分可以在functions软件包(此处为文档)中找到。其他一些(通常是二进制函数)可以直接在Column对象中找到(此处为文档)。所以,如果你能使用它们,这通常是最好的选择注意:不要忘记窗口函数。

2)创建UDF

如果您不能用内置函数完成任务,您可以考虑定义一个UDF(用户定义函数)。当您可以独立处理列的每个项,并且希望生成与原始列(而不是聚合列)具有相同行数的新列时,它们非常有用。这种方法非常简单:首先,定义一个简单的函数,然后将其注册为UDF,然后使用它

def myFunc: (String => String) = { s => s.toLowerCase }
import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)
val newDF = df.withColumn("newCol", myUDF(df("oldCol")))

想了解更多信息,这里有一篇不错的文章。

3)使用UDAF

如果您的任务是创建聚合数据,则可以定义UDAF(用户定义聚合函数)。我对此没有太多经验,但我可以给你一个很好的教程:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4)回退到RDD处理

如果您真的不能使用上面的选项,或者如果您处理任务依赖于不同的行来处理一个,并且它不是聚合,那么我认为您必须选择您想要的列,并使用相应的RDD来处理它。示例:

val singleColumnDF = df("column")
val myRDD = singleColumnDF.rdd
// process myRDD

所以,我能想到一些选择。我希望它能有所帮助。

优秀的文档中给出了一个简单的例子,其中有一整节专门介绍UDF:

import org.apache.spark.sql._
val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
val spark = df.sparkSession
spark.udf.register("simpleUDF", (v: Int) => v * v)
df.select($"id", callUDF("simpleUDF", $"value"))

相关内容

  • 没有找到相关文章

最新更新