如何将列添加到火花数据框架中,其中每个新元素都取决于另一列的组



我在此形式中有一个火花数据框架:

+----------+-----+-------+-------+
| timestamp|  lat|    lon|user_id|
+----------+-----+-------+-------+
|1511512345|34.12|-120.12|      1|
|1511512348|34.13|-120.13|      1|
|1511512349|34.14|-120.14|      1|
|1511551234|31.11|-122.01|      2|
|1511551236|31.15|-122.03|      2|
+----------+-----+-------+-------+

我需要对每个用户的位置数据的时间进行计算。该计算需要将LAT/LON数据的整个时间表进行地图匹配(即,找到GPS位置的最佳映射到道路地图)。结果是一系列road_id S,然后我想将其附加到DataFrame

+----------+-----+-------+-------+-------+
| timestamp|  lat|    lon|user_id|road_id|
+----------+-----+-------+-------+-------+
|1511512345|34.12|-120.12|      1|     12|
|1511512348|34.13|-120.13|      1|     12|
|1511512349|34.14|-120.14|      1|    345|
|1511551234|31.11|-122.01|      2|    737|
|1511551236|31.15|-122.03|      2|    643|
+----------+-----+-------+-------+-------+

请注意,我需要每个user_id的整个时间表以执行此计算(即无法完成计算,但每个user_id都需要整个组)Spark DataFrame API?我不确定是否可以使用groupbywithColumn或其他一些方法来完成此操作。

df.sortby('timestamp').groupby('user_id').agg(...) ?

road_id序列通常是使用HMM模型计算的,并且是路网和整个LAT/LON序列的函数(如将GPS跟踪与地图匹配中所述)。

基本上,地图匹配器的输入将是LAT/LON值的整个序列,输出将是相同长度的ROAD_ID序列

您需要进行组件来生成一个新的数据帧,然后您将此新数据框架加入原始数据框。

我使用scala(ymmv给您用pyspark标记)。


我的理解是,您要计算每个user_id数据集中的每个记录的值和整个lat/lon序列。

将我视为窗口聚合问题。

让我们定义一个窗口规范(我再次使用scala so ymmv)。

val input = Seq(
  ("1511512345", 34.12, -120.12, 1))
  .toDF("timestamp", "lat", "lon", "user_id")
import org.apache.spark.sql.expressions.Window
val byUserId = Window.partitionBy("user_id").orderBy("timestamp")
val inputWithLatsAndLonsCols = input
  .withColumn("lats", collect_list("lat") over byUserId)
  .withColumn("lons", collect_list("lon") over byUserId)
scala> inputWithLatsAndLonsCols.show
+----------+-----+-------+-------+-------+---------+
| timestamp|  lat|    lon|user_id|   lats|     lons|
+----------+-----+-------+-------+-------+---------+
|1511512345|34.12|-120.12|      1|[34.12]|[-120.12]|
+----------+-----+-------+-------+-------+---------+
// define UDF to do the calculation
// NOTE that the UDF always returns 1 for demo purposes
val roadId = udf { (lats: Seq[Double], lons: Seq[Double]) => 1 }
val roads = inputWithLatsAndLonsCols.withColumn("road_id", roadId($"lats", $"lons"))
scala> roads.show
+----------+-----+-------+-------+-------+---------+-------+
| timestamp|  lat|    lon|user_id|   lats|     lons|road_id|
+----------+-----+-------+-------+-------+---------+-------+
|1511512345|34.12|-120.12|      1|[34.12]|[-120.12]|      1|
+----------+-----+-------+-------+-------+---------+-------+

相关内容

  • 没有找到相关文章

最新更新