将 UDF 应用于多个列并使用 numpy 操作



>我在pyspark中有一个名为result的数据帧,我想应用udf创建一个新列,如下所示:

result = sqlContext.createDataFrame([(138,5,10), (128,4,10), (112,3,10), (120,3,10), (189,1,10)]).withColumnRenamed("_1","count").withColumnRenamed("_2","df").withColumnRenamed("_3","docs")
@udf("float")
def newFunction(arr):
return (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])
result=result.withColumn("new_function_result",newFunction_udf(array("count","df","docs")))

列计数,DF,文档都是整数列,但这返回

Py4JError:调用时出错 z:org.apache.spark.sql.functions.col.跟踪:py4j。Py4JException: 方法 col([class java.util.ArrayList]( 不存在于 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318( 在 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339( 在 PY4J。Gateway.invoke(Gateway.java:274( at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132( at py4j.command.CallCommand.execute(CallCommand.java:79( at PY4J。GatewayConnection.run(GatewayConnection.java:214( at java.lang.Thread.run(Thread.java:748(

当我尝试传递一列并获得这些列的正方形时,它工作正常。

任何帮助,不胜感激。

错误消息具有误导性,但试图告诉您函数不返回浮点数。您的函数返回类型numpy.float64的值,您可以使用 VectorUDT 类型(函数:在下面的示例中newFunctionVector(获取该值。使用 numpy 的另一种方法是将 numpy 类型numpy.float64转换为 python 类型浮点数(函数:在下面的示例中newFunctionWithArray(。

最后但并非最不重要的一点是,没有必要调用数组,因为 udfs 可以使用多个参数(函数:在下面的示例中newFunction(。

import numpy as np
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import Vectors, VectorUDT
result = sqlContext.createDataFrame([(138,5,10), (128,4,10), (112,3,10), (120,3,10), (189,1,10)], ["count","df","docs"])
def newFunctionVector(arr):
return (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])
@udf("float")
def newFunctionWithArray(arr):
returnValue = (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])
return returnValue.item()
@udf("float")
def newFunction(count, df, docs):
returnValue = (1 + np.log(count)) * np.log(docs/df)
return returnValue.item()

vector_udf = udf(newFunctionVector, VectorUDT())
result=result.withColumn("new_function_result", newFunction("count","df","docs"))
result=result.withColumn("new_function_result_WithArray", newFunctionWithArray(array("count","df","docs")))
result=result.withColumn("new_function_result_Vector", newFunctionWithArray(array("count","df","docs")))
result.printSchema()
result.show()

输出:

root 
|-- count: long (nullable = true) 
|-- df: long (nullable = true) 
|-- docs: long (nullable = true) 
|-- new_function_result: float (nullable = true) 
|-- new_function_result_WithArray: float (nullable = true) 
|-- new_function_result_Vector: float (nullable = true)
+-----+---+----+-------------------+-----------------------------+--------------------------+ 
|count| df|docs|new_function_result|new_function_result_WithArray|new_function_result_Vector|
+-----+---+----+-------------------+-----------------------------+--------------------------+ 
|  138|  5|  10|           4.108459|                     4.108459|                  4.108459| 
|  128|  4|  10|           5.362161|                     5.362161|                  5.362161|
|  112|  3|  10|          6.8849173|                    6.8849173|                 6.8849173|
|  120|  3|  10|           6.967983|                     6.967983|                  6.967983|
|  189|  1|  10|          14.372153|                    14.372153|                 14.372153|  
+-----+---+----+-------------------+-----------------------------+--------------------------+

最新更新