如何在 Pyspark 中添加一行来增加增量索引?



我想逐步自动返回"idx"列!我已经提供了如何手动完成!

schema = StructType([
StructField( 'vin', StringType(), True),StructField( 'age', IntegerType(), True),StructField( 'var', IntegerType(), True),StructField( 'rim', IntegerType(), True),StructField( 'cap', IntegerType(), True),StructField( 'cur', IntegerType(), True)
])
data = [['tom', 10,54,87,23,90], ['nick', 15,63,23,11,65], ['juli', 14,87,9,43,21]]
data_1=['sam',60,45,34,12,67]
df=spark.createDataFrame(data,schema)
df=df.withColumn('idx',monotonically_increasing_id()).union(spark.createDataFrame(data_1 ))
#--------------------------------------------------------------------
#I could do it in this way below , but I want it to be automated!
df=df.withColumn('idx',F.row_number().over(Window.orderBy('age')))
.union(spark.createDataFrame([data_1 + [4]] ))
#---------------------------------------------------------------------
df.show()
#Expected outcome:
#>>>
+----+---+---+---+---+---+---+
| vin|age|var|rim|cap|cur|idx|
+----+---+---+---+---+---+---+
| tom| 10| 54| 87| 23| 90|  1|
|juli| 14| 87|  9| 43| 21|  2|
|nick| 15| 63| 23| 11| 65|  3|
| sam| 60| 45| 34| 12| 67|  4|
+----+---+---+---+---+---+---+

您可以从原始 df 获取最大 idx,并将其添加到新 df 的 idx 中。

from pyspark.sql import functions as F, Window
df = df.withColumn('idx',F.row_number().over(Window.orderBy('age')))
df.show()
+----+---+---+---+---+---+---+
| vin|age|var|rim|cap|cur|idx|
+----+---+---+---+---+---+---+
| tom| 10| 54| 87| 23| 90|  1|
|juli| 14| 87|  9| 43| 21|  2|
|nick| 15| 63| 23| 11| 65|  3|
+----+---+---+---+---+---+---+
df2 = df.union(
spark.createDataFrame([data_1], schema).withColumn(
'idx',
F.row_number().over(Window.orderBy('age')) + F.lit(df.select(F.max('idx')).head()[0])
)
)
df2.show()
+----+---+---+---+---+---+---+
| vin|age|var|rim|cap|cur|idx|
+----+---+---+---+---+---+---+
| tom| 10| 54| 87| 23| 90|  1|
|juli| 14| 87|  9| 43| 21|  2|
|nick| 15| 63| 23| 11| 65|  3|
| sam| 60| 45| 34| 12| 67|  4|
+----+---+---+---+---+---+---+

您可以在联合后row_number再次运行,并仅在 null 时更新idx

from pyspark.sql import functions as F
data_1 = ['sam', 60, 45, 34, 12, 67]
df = df.withColumn('idx', F.row_number().over(Window.orderBy('age'))) 
.union(spark.createDataFrame([data_1]).withColumn("idx", F.lit(None))) 
.withColumn('idx', F.coalesce('idx', F.row_number().over(Window.orderBy('age'))))
df.show()
#+----+---+---+---+---+---+---+
#| vin|age|var|rim|cap|cur|idx|
#+----+---+---+---+---+---+---+
#| tom| 10| 54| 87| 23| 90|  1|
#|juli| 14| 87|  9| 43| 21|  2|
#|nick| 15| 63| 23| 11| 65|  3|
#| sam| 60| 45| 34| 12| 67|  4|
#+----+---+---+---+---+---+---+

但是,如果添加带有age < df.age的行,则需要在按列排序时生成新的行号age。否则,您的列idx将不正确。

最新更新