在不同数据框的列之间进行计算,其中包括使用Scala的for循环之类的东西



我有以下数据帧

DF1:
+----------+----------+---------+
|     Place|       lat|      lon|
+----------+----------+---------+
|         A|       X_A|      Y_A|
|         B|       X_B|      Y_B|
|         C|       X_C|      Y_C|
+----------+----------+---------+
DF2:
+----------+----------+---------+
|      City|       lat|      lon|
+----------+----------+---------+
|         D|       X_D|      Y_D|
|         E|       X_E|      Y_E|
|         F|       X_F|      Y_F|
|         G|       X_G|      Y_G|
|         H|       X_H|      Y_H|
|         I|       X_I|      Y_I|
+----------+----------+---------+

我想要得到的是从Place(从DF1)到City(从DF2)的最短欧氏距离

所以我要做的是:首先计算A地到D城市的距离,直到I,然后根据计算结果确定最短距离

下面的伪代码包含一个嵌套的for循环:
for (places = ranging from A until C){
X1 = places.lat
Y1 = places.lon
for (city = ranging from D until I){
X2 = city.lat
Y2 = city.lon
list d = sqrt((X2-X1)^2 - (Y2-Y1)^2))
res[place] = min(d)}

其中res[]实际上是包含最短距离的数据框中的一列。

所以我首先想到的是在两个数据框之间使用CrossJoin(),但后来我不知道该如何在该步骤之后继续。

有谁能帮帮我吗?

完成交叉连接后,可以使用hypot函数计算欧几里德距离,并使用withColumn数据集方法将其存储到distance列中,然后通过Place列将distance列与min聚合函数进行分组得到该列的最小值。

下面是完整的代码:

import org.apache.spark.sql.functions
import org.apache.spark.sql.functions.hypot
df1.crossJoin(df2)
.withColumn("distance", hypot(df1.col("lat") - df2.col("lat"), df1.col("lon") - df2.col("lon")))
.groupBy("Place")
.agg(functions.min("distance").as("min_distance"))

您将得到一个包含两列的数据框,类似于下面的:

+-----+-----------------+
|Place|min_distance     |
+-----+-----------------+
|B    |2.68700576850888 |
|C    |2.545584412271571|
|A    |2.82842712474619 |
+-----+-----------------+

如果你只有几个城市,我会说少于1000个城市,你可以避免crossjoin,groupBy和所有相关的洗牌,可能会减慢你的Spark作业:

  • 首先,在citiesPositions数组中收集所有城市的经纬度
  • 然后,添加一个新的列min_distance到你的数据框架,填充这个列如下:
    • citiesPositions数组转换为typedLit函数的列
    • 然后,使用transform函数
  • hypot函数应用于该数组的所有元素
  • 最后,用array_min函数
  • 取这个数组中最小的元素

你可以这样编码:

import org.apache.spark.sql.functions.{array_min, col, hypot, transform, typedLit}
val citiesPositions = df2.select("lat", "lon")
.collect()
.map(row => (row.getDouble(0), row.getDouble(1)))
df1.withColumn(
"min_distance", 
array_min(
transform(
typedLit(citiesPositions), 
x => hypot(col("lat") - x.getItem("_1"), col("lon") - x.getItem("_2"))
)
)
)

此代码适用于Spark 3.0及更高版本。如果您使用的是3.0之前的Spark版本,则应该使用用户定义的函数

替换列转换。

最新更新