如何在 scala 中的数据帧中的多个列上执行 mapreduce



我的火花数据帧如下所示:

+-------+------+-------+------+------+
|userid1|time  |userid2|name1 |name2 |
+-------+------+-------+------+------+
|23     |1     |33     |user1 |user2 | 
|23     |2     |33     |new   |user2 |
|231    |1     |23     |231n  |new   |
|231    |4     |33     |231n  |user2 |
+-------+------+-------+------+------+

对于每一行,有 2 个具有相应名称的用户 ID,但只有一次。

我想为每个用户获取最新名称。 这就像将科隆userid1userid2结合起来。

结果应该是:

+------+-----------+
|userid|latest name|
+------+-----------+
|23    |new        |
|33    |user2      |
|231   |231n       |
+------+-----------+

我该怎么做?

我正在考虑使用partitonBy但我不知道如何将列userid1的结果和userid2结合起来并获取最新名称。

我也在考虑使用rdd.flatMap((row => row._1 -> row._2),(row => row._3 -> row._2)).reduceByKey(_ max _))但它是数据帧,而不是 RDD,我不确定语法。daatframe 中的 col 和 $ 真的让我感到困惑。(对不起,我对Spark相对较新。

你能试试这个解决方案吗?

import spark.implicits._
val users = Seq(
  (23, 1, 33, "user1", "user2"),
  (23, 2, 33, "new", "user2"),
  (231, 1, 23, "231", "new"),
  (231, 4, 33, "231", "user2")
).toDF("userid1", "time", "userid2", "name1", "name2")
val users1 = users.select(col("userid1").as("userid"), col("name1").as("name"), col("time"))
val users2 = users.select(col("userid2").as("userid"), col("name2").as("name"), col("time"))
val unitedUsers = users1.union(users2)
val resultDf = unitedUsers
  .withColumn("max_time", max("time").over(Window.partitionBy("userid")))
  .where(col("max_time") === col("time"))
  .select(col("userid"), col("name").as("latest_name"))
  .distinct()

最新更新