我想逐步自动返回"idx"列!我已经提供了如何手动完成!
schema = StructType([
StructField( 'vin', StringType(), True),StructField( 'age', IntegerType(), True),StructField( 'var', IntegerType(), True),StructField( 'rim', IntegerType(), True),StructField( 'cap', IntegerType(), True),StructField( 'cur', IntegerType(), True)
])
data = [['tom', 10,54,87,23,90], ['nick', 15,63,23,11,65], ['juli', 14,87,9,43,21]]
data_1=['sam',60,45,34,12,67]
df=spark.createDataFrame(data,schema)
df=df.withColumn('idx',monotonically_increasing_id()).union(spark.createDataFrame(data_1 ))
#--------------------------------------------------------------------
#I could do it in this way below , but I want it to be automated!
df=df.withColumn('idx',F.row_number().over(Window.orderBy('age')))
.union(spark.createDataFrame([data_1 + [4]] ))
#---------------------------------------------------------------------
df.show()
#Expected outcome:
#>>>
+----+---+---+---+---+---+---+
| vin|age|var|rim|cap|cur|idx|
+----+---+---+---+---+---+---+
| tom| 10| 54| 87| 23| 90| 1|
|juli| 14| 87| 9| 43| 21| 2|
|nick| 15| 63| 23| 11| 65| 3|
| sam| 60| 45| 34| 12| 67| 4|
+----+---+---+---+---+---+---+
您可以从原始 df 获取最大 idx,并将其添加到新 df 的 idx 中。
from pyspark.sql import functions as F, Window
df = df.withColumn('idx',F.row_number().over(Window.orderBy('age')))
df.show()
+----+---+---+---+---+---+---+
| vin|age|var|rim|cap|cur|idx|
+----+---+---+---+---+---+---+
| tom| 10| 54| 87| 23| 90| 1|
|juli| 14| 87| 9| 43| 21| 2|
|nick| 15| 63| 23| 11| 65| 3|
+----+---+---+---+---+---+---+
df2 = df.union(
spark.createDataFrame([data_1], schema).withColumn(
'idx',
F.row_number().over(Window.orderBy('age')) + F.lit(df.select(F.max('idx')).head()[0])
)
)
df2.show()
+----+---+---+---+---+---+---+
| vin|age|var|rim|cap|cur|idx|
+----+---+---+---+---+---+---+
| tom| 10| 54| 87| 23| 90| 1|
|juli| 14| 87| 9| 43| 21| 2|
|nick| 15| 63| 23| 11| 65| 3|
| sam| 60| 45| 34| 12| 67| 4|
+----+---+---+---+---+---+---+
您可以在联合后row_number
再次运行,并仅在 null 时更新idx
:
from pyspark.sql import functions as F
data_1 = ['sam', 60, 45, 34, 12, 67]
df = df.withColumn('idx', F.row_number().over(Window.orderBy('age')))
.union(spark.createDataFrame([data_1]).withColumn("idx", F.lit(None)))
.withColumn('idx', F.coalesce('idx', F.row_number().over(Window.orderBy('age'))))
df.show()
#+----+---+---+---+---+---+---+
#| vin|age|var|rim|cap|cur|idx|
#+----+---+---+---+---+---+---+
#| tom| 10| 54| 87| 23| 90| 1|
#|juli| 14| 87| 9| 43| 21| 2|
#|nick| 15| 63| 23| 11| 65| 3|
#| sam| 60| 45| 34| 12| 67| 4|
#+----+---+---+---+---+---+---+
但是,如果添加带有age < df.age
的行,则需要在按列排序时生成新的行号age
。否则,您的列idx
将不正确。