我有一个在一段时间内用户的纬度/纬度格式的位置数据集。我想计算这些用户所走的距离。样本数据集:
|时间戳|用户|纬度|经度|| 1462838468 | 49 b4361512443a4da…| 39.777982 | -7.054599 || 1462838512 | 49 b4361512443a4da…| 39.777982 | -7.054599 || 1462838389 | 49 b4361512443a4da…| 39.777982 | -7.054599 || 1462838497 | 49 b4361512443a4da…| 39.777982 | -7.054599 || 1465975885 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 |37.177322 | -7.426781 | 405 | 1457723815 c238e25fe0b9e7…| |37.177922 | -7.447443 | 405 | 1457897289 c238e25fe0b9e7…| |37.177922 | -7.447443 | 405 | 1457899229 c238e25fe0b9e7…| || 405 | 1457972626 c238e25fe0b9e7……| 37.18059| -7.46128|37.177322 | -7.426781 | 405 | 1458062553 c238e25fe0b9e7…| |37.178172 | -7.444512 | 405 | 1458241825 c238e25fe0b9e7…| |37.178172 | -7.444512 | 405 | 1458244457 c238e25fe0b9e7…| |37.177322 | -7.426781 | 405 | 1458412513 c238e25fe0b9e7…| |37.177322 | -7.426781 | 405 | 1458412292 c238e25fe0b9e7…| || 1465197963 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 || 1465202192 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 || 1465923817 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 || 1465923766 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 || 1465923748 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 || 1465923922 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 |
我想过使用自定义聚合器函数,但似乎没有Python支持。此外,这些操作需要以特定的顺序在相邻的点上完成,所以我不知道自定义聚合器是否可以工作。
我也看了reduceByKey
,但距离函数似乎不满足操作者的要求。
是否有一种方法可以在Spark中高效地执行此操作?
这看起来像是窗口函数的工作。假设我们将距离定义为:
from pyspark.sql.functions import acos, cos, sin, lit, toRadians
def dist(long_x, lat_x, long_y, lat_y):
return acos(
sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
cos(toRadians(long_x) - toRadians(long_y))
) * lit(6371.0)
您可以将窗口定义为:
from pyspark.sql.window import Window
w = Window().partitionBy("User").orderBy("Timestamp")
,并使用lag
计算连续观测之间的距离:
from pyspark.sql.functions import lag
df.withColumn("dist", dist(
"longitude", "latitude",
lag("longitude", 1).over(w), lag("latitude", 1).over(w)
).alias("dist"))
之后可以执行标准聚合。