我想知道如何根据另一行更新一些行。
例如,我有一些数据像
Id | useraname | ratings | city
--------------------------------
1, philip, 2.0, montreal, ...
2, john, 4.0, montreal, ...
3, charles, 2.0, texas, ...
我想将同一城市的用户更新为相同的groupId(1或2)
Id | useraname | ratings | city
--------------------------------
1, philip, 2.0, montreal, ...
1, john, 4.0, montreal, ...
3, charles, 2.0, texas, ...
如何在我的RDD或Dataset中实现这一点?
为了完整起见,如果Id
是一个字符串,密集排序就不起作用了呢?
例如?
Id | useraname | ratings | city
--------------------------------
a, philip, 2.0, montreal, ...
b, john, 4.0, montreal, ...
c, charles, 2.0, texas, ...
所以结果是这样的:
grade | useraname | ratings | city
--------------------------------
a, philip, 2.0, montreal, ...
a, john, 4.0, montreal, ...
c, charles, 2.0, texas, ...
这样做的一个干净的方法是使用Window
函数中的dense_rank()
。它枚举Window
列中的唯一值。因为city
是String
列,这些将按字母顺序递增。
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window
val df = spark.createDataFrame(Seq(
(1, "philip", 2.0, "montreal"),
(2, "john", 4.0, "montreal"),
(3, "charles", 2.0, "texas"))).toDF("Id", "username", "rating", "city")
val w = Window.orderBy($"city")
df.withColumn("id", rank().over(w)).show()
+---+--------+------+--------+
| id|username|rating| city|
+---+--------+------+--------+
| 1| philip| 2.0|montreal|
| 1| john| 4.0|montreal|
| 2| charles| 2.0| texas|
+---+--------+------+--------+
尝试:
df.select("city").distinct.withColumn("id", monotonically_increasing_id).join(df.drop("id"), Seq("city"))