如何在 PySpark 中使用 Scala UDF



我希望能够在 PySpark 中使用 Scala 函数作为 UDF

package com.test
object ScalaPySparkUDFs extends Serializable {
    def testFunction1(x: Int): Int = { x * 2 }
    def testUDFFunction1 = udf { x: Int => testFunction1(x) }
} 

我可以在 PySpark 中访问testFunction1并让它返回值:

functions = sc._jvm.com.test.ScalaPySparkUDFs 
functions.testFunction1(10)

我希望能够做的是将此函数用作 UDF,最好是在withColumn调用中:

row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))

我认为这里有一个有前途的方法:Spark:如何将Python与Scala或Java用户定义函数映射?

但是,当对找到的代码进行更改以改用testUDFFunction1时:

def udf_test(col):
    sc = SparkContext._active_spark_context
    _f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
    return Column(_f(_to_seq(sc, [col], _to_java_column)))

我得到:

 AttributeError: 'JavaMember' object has no attribute 'apply' 

我不明白这一点,因为我相信testUDFFunction1确实有应用方法?

我不想使用此处找到的表达式类型:将UDF从Scala注册到SqlContext,以便在PySpark中使用

关于如何使这项工作的任何建议将不胜感激!

同意@user6910411,您必须直接在函数上调用 apply 方法。所以,你的代码将是。

Scala 中的 UDF:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._

object ScalaPySparkUDFs {
    def testFunction1(x: Int): Int = { x * 2 }
    def getFun(): UserDefinedFunction = udf(testFunction1 _ )
}

PySpark 代码:

def test_udf(col):
    sc = spark.sparkContext
    _test_udf = sc._jvm.com.test.ScalaPySparkUDFs.getFun()
    return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column)))

row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", test_udf(numbers['Value']))

您链接的问题是使用 Scala object 。Scala object是一个单例,你可以直接使用apply方法。

这里你使用一个空函数,它返回一个UserDefinedFunction类 co 的对象,你必须首先调用该函数:

_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))

相关内容

  • 没有找到相关文章

最新更新