在Spark版本2.2中,使用row_number()函数为PySpark DataFrame中的每一行创建一个行号



我有一个PySpark DataFrame-

valuesCol = [('Sweden',31),('Norway',62),('Iceland',13),('Finland',24),('Denmark',52)]
df = sqlContext.createDataFrame(valuesCol,['name','id'])
+-------+---+
|   name| id|
+-------+---+
| Sweden| 31|
| Norway| 62|
|Iceland| 13|
|Finland| 24|
|Denmark| 52|
+-------+---+

我想在这个DataFrame中添加一行列,这是行的行号(序列号(,如下所示-

我的最终输出应该是:

+-------+---+--------+
|   name| id|row_num |
+-------+---+--------+
| Sweden| 31|       1|
| Norway| 62|       2|
|Iceland| 13|       3|
|Finland| 24|       4|
|Denmark| 52|       5|
+-------+---+--------+

我的Spark版本是2.2

我正在尝试这个代码,但它不起作用-

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().orderBy()
df = df.withColumn("row_num", row_number().over(w))
df.show()

我收到一个错误:

AnalysisException: 'Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;'

如果我理解正确的话,我需要订购一些列,但我不想要像w = Window().orderBy('id')这样的东西,因为这会重新排序整个DataFrame。

有人能建议如何使用row_number()功能实现上述输出吗?

您应该为order子句定义列。如果不需要对值进行排序,则编写一个伪值。请尝试以下内容;

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("row_num", row_number().over(w))

我也遇到过类似的问题,但在我的案例中,@Ali Yesilli的解决方案失败了,因为我分别读取了多个输入文件,并最终将它们统一在一个数据帧中。在这种情况下,由伪变量排序的窗口内的顺序被证明是不可预测的。

因此,为了实现更稳健的排序,我使用了monotonically_increasing_id:

df = df.withColumn('original_order', monotonically_increasing_id())
df = df.withColumn('row_num', row_number().over(Window.orderBy('original_order')))
df = df.drop('original_order')

相关内容

  • 没有找到相关文章

最新更新