PYSPARK:将多个数据帧字段传递到UDF



我是Spark和Python的新手。任何帮助都赞赏。

我有一个UDF,并与我们一起创建了一个火花数据框架,纬度和经度

udf:

import math
def distance(origin, destination):
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) 
    * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
d = radius * c
return d

样本UDF输出:

distance((101,121),(-121,-212)) 

15447.812243421227

dataframe:

zip=spark.read.option("sep", ",").csv('wasb://hdiazurepoc@dsazurepoc.blob.core.windows.net/main/zip.txt')
zip1=zip.select(zip._c0,zip._c1.cast("Double"),zip._c2.cast("Double"))

示例zip1数据:

zip1.first()        

行(_c0 = u'00601',_c1 = 18.180555,_c2 = -66.749961)

现在,我试图将纬度和经度从DF ZIP1传递到UDF距离,但是我遇到了"需要float是需要"的错误。我相信UDF没有从DF字段中获取数据,而是将其读取DF列作为常量值;因此我要低于错误。

z=zip1.select(distance((zip1._c1,100.23),(zip1._c2,-99.21)))

追溯(最近的最新电话):
文件",第1行,
文件",第5行,在距离
TypeError:需要浮子

请让我知道将DF字段传递到UDF的正确方法。

我不确定您拥有的数据模式是什么。但是以下示例是使用udf获取示例答案的正确方法。

from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
def distance(origin, destination):
    lat1, lon1 = origin
    lat2, lon2 = destination
    radius = 6371 # km
    dlat = math.radians(lat2-lat1)
    dlon = math.radians(lon2-lon1)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) 
    * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c
    return d
df = spark.createDataFrame([([101, 121], [-121, -212])], ["origin", "destination"])
filter_udf = udf(distance, DoubleType())
df.withColumn("distance", filter_udf(df.origin, df.destination))
+----------+------------+------------------+
|    origin| destination|          distance|
+----------+------------+------------------+
|[101, 121]|[-121, -212]|15447.812243421227|
+----------+------------+------------------+

最新更新