如何在不使用Spark的情况下调试pandas_udfs



如果我在Palantir Foundry中使用Python Transforms,并且我试图运行一种使用内存/非spark库的算法,并且我希望它能自动扩展并在spark中工作(而不是熊猫(。如果我在编写代码时遇到困难,并且想在本地测试和开发它,但稍后在pyspark中使用相同的代码,我该如何做到这一点?

作为一个具体的例子,我想计算一个包含多边形的geojson列的面积。因为我需要使用一些不是Spark原生的库(shapelypyproj(。我知道最好的方法(性能方面(是使用pandas_udf(也称为流式udf或矢量化udf(。但在阅读了一些指南,特别是PySpark的Pandas UDF介绍之后,Pandas用户定义函数以及使用Pandas UDF和代码示例进行大规模建模,调试和开始工作仍然很有挑战性,而且我似乎不能使用break语句,而且没有一流的日志/打印方法。

实际的数据帧将有数百万行(与数百万个多边形有关(,但为了简单起见,我想用一个简单的数据帧进行本地测试,稍后它将扩展到更大的数据集:

df = spark.createDataFrame(
[
("AFG", "{"type":"Polygon","coordinates":[[[61.210817,35.650072],[62.230651,35.270664],[62.984662,35.404041],[63.193538,35.857166],[63.982896,36.007957],[64.546479,36.312073],[64.746105,37.111818],[65.588948,37.305217],[65.745631,37.661164],[66.217385,37.39379],[66.518607,37.362784],[67.075782,37.356144],[67.83,37.144994],[68.135562,37.023115],[68.859446,37.344336],[69.196273,37.151144],[69.518785,37.608997],[70.116578,37.588223],[70.270574,37.735165],[70.376304,38.138396],[70.806821,38.486282],[71.348131,38.258905],[71.239404,37.953265],[71.541918,37.905774],[71.448693,37.065645],[71.844638,36.738171],[72.193041,36.948288],[72.63689,37.047558],[73.260056,37.495257],[73.948696,37.421566],[74.980002,37.41999],[75.158028,37.133031],[74.575893,37.020841],[74.067552,36.836176],[72.920025,36.720007],[71.846292,36.509942],[71.262348,36.074388],[71.498768,35.650563],[71.613076,35.153203],[71.115019,34.733126],[71.156773,34.348911],[70.881803,33.988856],[69.930543,34.02012],[70.323594,33.358533],[69.687147,33.105499],[69.262522,32.501944],[69.317764,31.901412],[68.926677,31.620189],[68.556932,31.71331],[67.792689,31.58293],[67.683394,31.303154],[66.938891,31.304911],[66.381458,30.738899],[66.346473,29.887943],[65.046862,29.472181],[64.350419,29.560031],[64.148002,29.340819],[63.550261,29.468331],[62.549857,29.318572],[60.874248,29.829239],[61.781222,30.73585],[61.699314,31.379506],[60.941945,31.548075],[60.863655,32.18292],[60.536078,32.981269],[60.9637,33.528832],[60.52843,33.676446],[60.803193,34.404102],[61.210817,35.650072]]]}"),  
("ALB", "{"type":"Polygon","coordinates":[[[20.590247,41.855404],[20.463175,41.515089],[20.605182,41.086226],[21.02004,40.842727],[20.99999,40.580004],[20.674997,40.435],[20.615,40.110007],[20.150016,39.624998],[19.98,39.694993],[19.960002,39.915006],[19.406082,40.250773],[19.319059,40.72723],[19.40355,41.409566],[19.540027,41.719986],[19.371769,41.877548],[19.304486,42.195745],[19.738051,42.688247],[19.801613,42.500093],[20.0707,42.58863],[20.283755,42.32026],[20.52295,42.21787],[20.590247,41.855404]]]}"),
],# can continue with more countries  from https://raw.githubusercontent.com/johan/world.geo.json/34c96bba9c07d2ceb30696c599bb51a5b939b20f/countries.geo.json
["country", "geometry"]
)

给定的几何列实际上是geojson,我如何使用一种好的GIS方法来计算面积(平方米(?例如,使用这些问题中概述的方法:

在Shapely 中以平面单位(例如平方米(计算多边形面积

如何使用Python 获取GeoJSON多边形的面积

如何计算地球上多边形的面积';使用python的曲面?

考虑pandas_udfs的方法是编写要应用于pandas系列的逻辑。这意味着您将应用一个操作,它将自动应用于每一行。

如果你想在本地开发它,你实际上可以从你的数据中提取一个小得多的样本(就像你做的那样(,并将其存储在pandas系列中,然后在那里工作:

from shapely.geometry import Polygon
import json
from pyproj import Geod
#just select the column you want to use the pandas udf
pdf = df.select("geometry").toPandas()
#convert to pandas series
pdf_geom_raw = pdf.ix[:,0]
#how to apply converting string to json/dict
pdf_geom = pdf_geom_raw.apply(json.loads)
# function using non-spark functions
def get_area(shape):
geod = Geod(ellps="WGS84")
poly = Polygon(shape["coordinates"][0])
area = abs(geod.geometry_area_perimeter(poly)[0])
return area
pdf_geom = pdf_geom.apply(get_area)

在这里,你可以通过将pdf = df.select("geometry").toPandas()替换为pdf = pd.read_csv("geo.csv")来本地尝试(没有火花(

现在你已经在本地工作了,你可以在pandas_udf中复制粘贴代码

from shapely.geometry import Polygon
import json
from pyproj import Geod
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('double', PandasUDFType.SCALAR)
def geodesic_polygon_area(pdf_geom): 
pdf_geom = pdf_geom.apply(json.loads)
def get_area(shape):
geod = Geod(ellps="WGS84")
poly = Polygon(shape["coordinates"][0])
area = abs(geod.geometry_area_perimeter(poly)[0])
return area
pdf_geom = pdf_geom.apply(get_area)
return pdf_geom
df =  df.withColumn('area_square_meters', geodesic_polygon_area(df.geometry))

运行代码时:

>>> df.show()
+-------+--------------------+--------------------+
|country|            geometry|  area_square_meters|
+-------+--------------------+--------------------+
|    AFG|{"type":"Polygon"...|6.522700837770404E11|
|    ALB|{"type":"Polygon"...|2.969479517410540...|
+-------+--------------------+--------------------+

相关内容

最新更新