i在Spark中有一个带有2列group_id
和value
的数据框,其中value
是双重的。我想根据group_id
对数据进行分组,通过value
订购每个组,然后添加第三列index
,该列代表value
在组值的顺序中的位置。
例如,考虑以下输入数据:
+--------+-----+
|group_id|value|
+--------+-----+
|1 |1.3 |
|2 |0.8 |
|1 |3.4 |
|1 |-1.7 |
|2 |2.3 |
|2 |5.9 |
|1 |2.7 |
|1 |0.0 |
+--------+-----+
然后输出将是
+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
|1 |-1.7 |1 |
|1 |0.0 |2 |
|1 |1.3 |3 |
|1 |2.7 |4 |
|1 |3.4 |5 |
|2 |0.8 |1 |
|2 |2.3 |2 |
|2 |5.9 |3 |
+--------+-----+-----+
如果索引基于0以及排序是上升还是下降。
是不重要的。作为后续行动,请考虑原始数据中有第三列extra
的情况,该数据对某些(group_id, value)
组合进行了多个值。一个例子是:
+--------+-----+-----+
|group_id|value|extra|
+--------+-----+-----+
|1 |1.3 |1 |
|1 |1.3 |2 |
|2 |0.8 |1 |
|1 |3.4 |1 |
|1 |3.4 |2 |
|1 |3.4 |3 |
|1 |-1.7 |1 |
|2 |2.3 |1 |
|2 |5.9 |1 |
|1 |2.7 |1 |
|1 |0.0 |1 |
+--------+-----+-----+
有没有办法添加index
列,以使extra
列不考虑而是仍然保留?在这种情况下的输出将为
+--------+-----+-----+-----+
|group_id|value|extra|index|
+--------+-----+-----+-----+
|1 |-1.7 |1 |1 |
|1 |0.0 |1 |2 |
|1 |1.3 |1 |3 |
|1 |1.3 |2 |3 |
|1 |2.7 |1 |4 |
|1 |3.4 |1 |5 |
|1 |3.4 |2 |5 |
|1 |3.4 |3 |5 |
|2 |0.8 |1 |1 |
|2 |2.3 |1 |2 |
|2 |5.9 |1 |3 |
+--------+-----+-----+-----+
我知道可以通过复制数据,删除extra
列
- 复制数据
- 删除
extra
列 - 执行
distinct
操作,这将导致原始示例中的数据 - 使用原始解决方案计算
index
列 - 将结果与第二个示例的数据一起加入
但是,这将涉及许多额外的计算和开销。
您可以使用Window
函数来创建基于value
的等级列,由group_id
分区:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank
# Define window
window = Window.partitionBy(df['group_id']).orderBy(df['value'])
# Create column
df.select('*', rank().over(window).alias('index')).show()
+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
| 1| -1.7| 1|
| 1| 0.0| 2|
| 1| 1.3| 3|
| 1| 2.7| 4|
| 1| 3.4| 5|
| 2| 0.8| 1|
| 2| 2.3| 2|
| 2| 5.9| 3|
+--------+-----+-----+
因为,您首先选择'*'
,因此您还使用上述代码保留所有其他变量。但是,您的第二个示例表明您正在寻找函数dense_rank()
,该函数以排名列而没有空白:
df.select('*', dense_rank().over(window).alias('index'))