在Spark DataFrame中为每个组创建索引



i在Spark中有一个带有2列group_idvalue的数据框,其中value是双重的。我想根据group_id对数据进行分组,通过value订购每个组,然后添加第三列index,该列代表value在组值的顺序中的位置。

例如,考虑以下输入数据:

+--------+-----+
|group_id|value|
+--------+-----+
|1       |1.3  |
|2       |0.8  |
|1       |3.4  |
|1       |-1.7 |
|2       |2.3  |
|2       |5.9  |
|1       |2.7  |
|1       |0.0  |
+--------+-----+

然后输出将是

+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
|1       |-1.7 |1    |
|1       |0.0  |2    |
|1       |1.3  |3    |
|1       |2.7  |4    |
|1       |3.4  |5    |
|2       |0.8  |1    |
|2       |2.3  |2    |
|2       |5.9  |3    |
+--------+-----+-----+

如果索引基于0以及排序是上升还是下降。

是不重要的。

作为后续行动,请考虑原始数据中有第三列extra的情况,该数据对某些(group_id, value)组合进行了多个值。一个例子是:

+--------+-----+-----+
|group_id|value|extra|
+--------+-----+-----+
|1       |1.3  |1    |
|1       |1.3  |2    |
|2       |0.8  |1    |
|1       |3.4  |1    |
|1       |3.4  |2    |
|1       |3.4  |3    |
|1       |-1.7 |1    |
|2       |2.3  |1    |
|2       |5.9  |1    |
|1       |2.7  |1    |
|1       |0.0  |1    |
+--------+-----+-----+

有没有办法添加index列,以使extra列不考虑而是仍然保留?在这种情况下的输出将为

+--------+-----+-----+-----+
|group_id|value|extra|index|
+--------+-----+-----+-----+
|1       |-1.7 |1    |1    |
|1       |0.0  |1    |2    |
|1       |1.3  |1    |3    |
|1       |1.3  |2    |3    |
|1       |2.7  |1    |4    |
|1       |3.4  |1    |5    |
|1       |3.4  |2    |5    |
|1       |3.4  |3    |5    |
|2       |0.8  |1    |1    |
|2       |2.3  |1    |2    |
|2       |5.9  |1    |3    |
+--------+-----+-----+-----+

我知道可以通过复制数据,删除extra

来执行此操作。
  1. 复制数据
  2. 删除extra
  3. 执行distinct操作,这将导致原始示例中的数据
  4. 使用原始解决方案计算index
  5. 将结果与第二个示例的数据一起加入

但是,这将涉及许多额外的计算和开销。

您可以使用Window函数来创建基于value的等级列,由group_id分区:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank
# Define window
window = Window.partitionBy(df['group_id']).orderBy(df['value'])
# Create column
df.select('*', rank().over(window).alias('index')).show()
+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
|       1| -1.7|    1|
|       1|  0.0|    2|
|       1|  1.3|    3|
|       1|  2.7|    4|
|       1|  3.4|    5|
|       2|  0.8|    1|
|       2|  2.3|    2|
|       2|  5.9|    3|
+--------+-----+-----+

因为,您首先选择'*',因此您还使用上述代码保留所有其他变量。但是,您的第二个示例表明您正在寻找函数dense_rank(),该函数以排名列而没有空白:

df.select('*', dense_rank().over(window).alias('index'))

相关内容

  • 没有找到相关文章

最新更新