我有一个pyspark数据帧,类似于下面的
+------------------------------------+-------------------+--------+------------------+-----------+
|ID |timestamp |accuracy|lat |lon |
+------------------------------------+-------------------+--------+------------------+-----------+
|5ab75a28-ed7f-4819-b781-640f8784ca6b|2020-01-01 04:54:04|13.2 |32.768036 |-97.0996884|
|eef755ef-de7b-4de1-931d-42d204990a07|2020-01-01 09:13:33|16.0 |21.377116 |-157.92295 |
|eef755ef-de7b-4de1-931d-42d204990a07|2020-01-01 09:13:33|16.0 |21.377116 |-157.92295 |
|eef755ef-de7b-4de1-931d-42d204990a07|2020-01-01 09:13:33|16.0 |21.377116 |-157.92295 |
|6408d32b-5e64-43cb-9022-79a1baf8f1a0|2020-01-01 14:00:11|14.1 |25.8491957 |-80.1753329|
|52ad3006-a105-4501-b9f8-9658edc6380a|2020-01-01 21:03:25|12.97 |33.016752499999996|-96.8533025|
|3676d46f-429c-4706-b43a-b1370391c1d0|2020-01-01 22:29:22|8.0 |29.536905 |-95.265337 |
|3676d46f-429c-4706-b43a-b1370391c1d0|2020-01-01 22:29:22|8.0 |29.536905 |-95.265337 |
+------------------------------------+-------------------+--------+------------------+-----------+
我想为每个ID
关联一个整数,并有一个列code
,如下面的
df['code']
0
1
1
1
2
3
4
4
如果我做
from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window
df.withColumn("code", dense_rank().over(Window.orderBy("timestamp"))).show()
我得到
+--------------------+-------------------+------------------+------------------+-------------------+----+
| ID| timestamp| accuracy| lat| lon|code|
+--------------------+-------------------+------------------+------------------+-------------------+----+
|59ae42df-37f3-4d5...|2020-01-01 00:00:00| 2.0| 39.53418731689453|-105.03048706054688| 1|
|2fd86c2c-eb94-42f...|2020-01-01 00:00:00| 6.0| 40.4071159362793| -92.56725311279297| 1|
|3739eb5b-ee20-440...|2020-01-01 00:00:00| 4.0| 44.7714958190918| -85.0419692993164| 1|
|6daab4ca-52c5-436...|2020-01-01 00:00:00|15.100000381469727|32.661895751953125|-115.45074462890625| 1|
|0162e422-5b2b-49b...|2020-01-01 00:00:00| 16.0|34.127418518066406| -84.67899322509766| 1|
|92d321cf-e907-4c6...|2020-01-01 00:00:00| 10.0|37.662391662597656| -76.42019653320312| 1|
|3c1835a9-35a5-49d...|2020-01-01 00:00:00|12.300000190734863|32.696022033691406| -97.17048645019531| 1|
|1ab5a5bf-4511-44c...|2020-01-01 00:00:00| 17.0|40.704708099365234| -73.91097259521484| 1|
|4a8b1299-1485-443...|2020-01-01 00:00:00|12.399999618530273| 39.25| -78.08999633789062| 1|
|7ba6c144-b7c2-4bf...|2020-01-01 00:00:00| 17.0|30.124404907226562| -95.55699920654297| 1|
|3290f184-7b56-414...|2020-01-01 00:00:00| 10.0| 36.62535095214844| -119.3048095703125| 1|
|f677d8e3-2fc6-47a...|2020-01-01 00:00:00| 5.0| 27.9086856842041| -82.51069641113281| 1|
|bcab074a-c5dd-406...|2020-01-01 00:00:00| 5.0| 34.4466667175293| -82.4963150024414| 1|
|1bb2532d-eb3c-47b...|2020-01-01 00:00:00| 5.0|27.372886657714844| -80.41101837158203| 1|
|8c6692c1-cb82-441...|2020-01-01 00:00:00| 8.0|29.719207763671875| -95.76821899414062| 1|
|d427fae2-26f7-47f...|2020-01-01 00:00:00| 5.0|31.151153564453125| -82.45683288574219| 1|
|db5c0e25-55df-44c...|2020-01-01 00:00:00|12.300000190734863| 40.85110092163086| -73.88435363769531| 1|
|2f61f080-65aa-4e5...|2020-01-01 00:00:00| 12.0| 36.2707405090332| -80.37544250488281| 1|
|ab9c61bb-7760-472...|2020-01-01 00:00:00| 15.0| 45.99789810180664|-112.59880065917969| 1|
|4b6a8d0f-ffd4-492...|2020-01-01 00:00:00| 5.0| 35.16313934326172| -80.95419311523438| 1|
+--------------------+-------------------+------------------+------------------+-------------------+----+
您可以尝试dense_rank
:
from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window
df.withColumn("code", dense_rank().over(Window.orderBy("ID")))
.show()
输出
+--------------------+-------------------+------------------+------------------+-------------------+----+
| ID| timestamp| accuracy| lat| lon|code|
+--------------------+-------------------+------------------+------------------+-------------------+----+
|0162e422-5b2b-49b...|2020-01-01 00:00:00| 16.0| 34.12741851806641| -84.67899322509766| 1|
|1ab5a5bf-4511-44c...|2020-01-01 00:00:00| 17.0| 40.70470809936523| -73.91097259521484| 2|
|1bb2532d-eb3c-47b...|2020-01-01 00:00:00| 5.0|27.372886657714844| -80.41101837158203| 3|
|2f61f080-65aa-4e5...|2020-01-01 00:00:00| 12.0| 36.2707405090332| -80.37544250488281| 4|
|2fd86c2c-eb94-42f...|2020-01-01 00:00:00| 6.0| 40.4071159362793| -92.56725311279295| 5|
|3290f184-7b56-414...|2020-01-01 00:00:00| 10.0| 36.62535095214844| -119.3048095703125| 6|
|3739eb5b-ee20-440...|2020-01-01 00:00:00| 4.0| 44.7714958190918| -85.0419692993164| 7|
|3c1835a9-35a5-49d...|2020-01-01 00:00:00|12.300000190734865| 32.69602203369141| -97.17048645019531| 8|
|4a8b1299-1485-443...|2020-01-01 00:00:00|12.399999618530273| 39.25| -78.08999633789062| 9|
|4b6a8d0f-ffd4-492...|2020-01-01 00:00:00| 5.0| 35.16313934326172| -80.95419311523438| 10|
|59ae42df-37f3-4d5...|2020-01-01 00:00:00| 2.0| 39.53418731689453|-105.03048706054688| 11|
|6daab4ca-52c5-436...|2020-01-01 00:00:00|15.100000381469727|32.661895751953125|-115.45074462890624| 12|
|7ba6c144-b7c2-4bf...|2020-01-01 00:00:00| 17.0| 30.12440490722656| -95.55699920654295| 13|
|8c6692c1-cb82-441...|2020-01-01 00:00:00| 8.0|29.719207763671875| -95.76821899414062| 14|
|92d321cf-e907-4c6...|2020-01-01 00:00:00| 10.0|37.662391662597656| -76.42019653320312| 15|
|ab9c61bb-7760-472...|2020-01-01 00:00:00| 15.0| 45.99789810180664|-112.59880065917969| 16|
|bcab074a-c5dd-406...|2020-01-01 00:00:00| 5.0|34.446666717529304| -82.4963150024414| 17|
|d427fae2-26f7-47f...|2020-01-01 00:00:00| 5.0|31.151153564453125| -82.45683288574219| 18|
|db5c0e25-55df-44c...|2020-01-01 00:00:00|12.300000190734865| 40.85110092163086| -73.88435363769531| 19|
|f677d8e3-2fc6-47a...|2020-01-01 00:00:00| 5.0| 27.9086856842041| -82.51069641113281| 20|
+--------------------+-------------------+------------------+------------------+-------------------+----+
如果要从0开始,可以减去-1
。
尝试使用row_number函数:
from pyspark.sql import functions as f
df.withColumn("row_number", f.row_number().over(Window.orderBy("timestamp")))