如何使用(Py)Spark在数据集中求和数据点之间的距离



我有一个在一段时间内用户的纬度/纬度格式的位置数据集。我想计算这些用户所走的距离。样本数据集:

|时间戳|用户|纬度|经度|| 1462838468 | 49 b4361512443a4da…| 39.777982 | -7.054599 || 1462838512 | 49 b4361512443a4da…| 39.777982 | -7.054599 || 1462838389 | 49 b4361512443a4da…| 39.777982 | -7.054599 || 1462838497 | 49 b4361512443a4da…| 39.777982 | -7.054599 || 1465975885 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 |37.177322 | -7.426781 | 405 | 1457723815 c238e25fe0b9e7…| |37.177922 | -7.447443 | 405 | 1457897289 c238e25fe0b9e7…| |37.177922 | -7.447443 | 405 | 1457899229 c238e25fe0b9e7…| || 405 | 1457972626 c238e25fe0b9e7……| 37.18059| -7.46128|37.177322 | -7.426781 | 405 | 1458062553 c238e25fe0b9e7…| |37.178172 | -7.444512 | 405 | 1458241825 c238e25fe0b9e7…| |37.178172 | -7.444512 | 405 | 1458244457 c238e25fe0b9e7…| |37.177322 | -7.426781 | 405 | 1458412513 c238e25fe0b9e7…| |37.177322 | -7.426781 | 405 | 1458412292 c238e25fe0b9e7…| || 1465197963 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 || 1465202192 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 || 1465923817 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 || 1465923766 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 || 1465923748 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 || 1465923922 | 6 e9e0581e2a032fd8…| 37.118362 | -8.205041 |

我想过使用自定义聚合器函数,但似乎没有Python支持。此外,这些操作需要以特定的顺序在相邻的点上完成,所以我不知道自定义聚合器是否可以工作。

我也看了reduceByKey,但距离函数似乎不满足操作者的要求。

是否有一种方法可以在Spark中高效地执行此操作?

这看起来像是窗口函数的工作。假设我们将距离定义为:

from pyspark.sql.functions import acos, cos, sin, lit, toRadians
def dist(long_x, lat_x, long_y, lat_y):
    return acos(
        sin(toRadians(lat_x)) * sin(toRadians(lat_y)) + 
        cos(toRadians(lat_x)) * cos(toRadians(lat_y)) * 
            cos(toRadians(long_x) - toRadians(long_y))
    ) * lit(6371.0)

您可以将窗口定义为:

from pyspark.sql.window import Window
w = Window().partitionBy("User").orderBy("Timestamp")

,并使用lag计算连续观测之间的距离:

from pyspark.sql.functions import lag
df.withColumn("dist", dist(
    "longitude", "latitude",
    lag("longitude", 1).over(w), lag("latitude", 1).over(w)
).alias("dist"))

之后可以执行标准聚合。

相关内容

  • 没有找到相关文章

最新更新