我有以下数据帧
DF1:
+----------+----------+---------+
| Place| lat| lon|
+----------+----------+---------+
| A| X_A| Y_A|
| B| X_B| Y_B|
| C| X_C| Y_C|
+----------+----------+---------+
DF2:
+----------+----------+---------+
| City| lat| lon|
+----------+----------+---------+
| D| X_D| Y_D|
| E| X_E| Y_E|
| F| X_F| Y_F|
| G| X_G| Y_G|
| H| X_H| Y_H|
| I| X_I| Y_I|
+----------+----------+---------+
我想要得到的是从Place(从DF1)到City(从DF2)的最短欧氏距离
所以我要做的是:首先计算A地到D城市的距离,直到I,然后根据计算结果确定最短距离
下面的伪代码包含一个嵌套的for循环:for (places = ranging from A until C){
X1 = places.lat
Y1 = places.lon
for (city = ranging from D until I){
X2 = city.lat
Y2 = city.lon
list d = sqrt((X2-X1)^2 - (Y2-Y1)^2))
res[place] = min(d)}
其中res[]
实际上是包含最短距离的数据框中的一列。
所以我首先想到的是在两个数据框之间使用CrossJoin()
,但后来我不知道该如何在该步骤之后继续。
有谁能帮帮我吗?
完成交叉连接后,可以使用hypot
函数计算欧几里德距离,并使用withColumn
数据集方法将其存储到distance
列中,然后通过Place
列将distance
列与min
聚合函数进行分组得到该列的最小值。
下面是完整的代码:
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions.hypot
df1.crossJoin(df2)
.withColumn("distance", hypot(df1.col("lat") - df2.col("lat"), df1.col("lon") - df2.col("lon")))
.groupBy("Place")
.agg(functions.min("distance").as("min_distance"))
您将得到一个包含两列的数据框,类似于下面的:
+-----+-----------------+
|Place|min_distance |
+-----+-----------------+
|B |2.68700576850888 |
|C |2.545584412271571|
|A |2.82842712474619 |
+-----+-----------------+
如果你只有几个城市,我会说少于1000个城市,你可以避免crossjoin
,groupBy
和所有相关的洗牌,可能会减慢你的Spark作业:
- 首先,在
citiesPositions
数组中收集所有城市的经纬度 - 然后,添加一个新的列
min_distance
到你的数据框架,填充这个列如下:- 将
citiesPositions
数组转换为typedLit
函数的列 然后,使用
transform
函数 将 - 将
- 最后,用
array_min
函数 取这个数组中最小的元素
hypot
函数应用于该数组的所有元素你可以这样编码:
import org.apache.spark.sql.functions.{array_min, col, hypot, transform, typedLit}
val citiesPositions = df2.select("lat", "lon")
.collect()
.map(row => (row.getDouble(0), row.getDouble(1)))
df1.withColumn(
"min_distance",
array_min(
transform(
typedLit(citiesPositions),
x => hypot(col("lat") - x.getItem("_1"), col("lon") - x.getItem("_2"))
)
)
)
此代码适用于Spark 3.0及更高版本。如果您使用的是3.0之前的Spark版本,则应该使用用户定义的函数
替换列转换。