Sl Now
1 D
2 D
3 D
4 R
5 R
6 C
7 C
8 C
9 D
10 P
11 R
12 R
13 D
我有一个像上面这样的数据集。
Sl Now lead
1 D R
2 D R
3 D R
4 R C
5 R C
6 C D
7 C D
8 C D
9 D P
10 P R
11 R D
12 R D
13 D
我想添加一个名为"lead"将显示"Now"的后续值。列,并将匹配"现在"列中的值的数量。我们能用PySpark实现这个吗?
我会这样做。
<标题>准备数据a = "DDDRRCCCDPRRD"
a = list(zip(range(len(a)), a))
b = ["Sl","Now"]
df = spark.createDataFrame(a,b)
df.show()
+---+---+
| Sl|now|
+---+---+
| 0| D|
| 1| D|
| 2| D|
| 3| R|
| 4| R|
| 5| C|
| 6| C|
| 7| C|
| 8| D|
| 9| P|
| 10| R|
| 11| R|
| 12| D|
+---+---+
<标题>进口from pyspark.sql import functions as F
from pyspark.sql import Window as W
添加增量ID
df = df.withColumn("id", F.when(F.col("now") == F.lag("Now").over(W.orderBy("Sl")), 0).otherwise(1))
df = df.withColumn("id", F.sum("id").over(W.orderBy("Sl")))
df.show()
+---+---+---+
| Sl|now| id|
+---+---+---+
| 0| D| 1|
| 1| D| 1|
| 2| D| 1|
| 3| R| 2|
| 4| R| 2|
| 5| C| 3|
| 6| C| 3|
| 7| C| 3|
| 8| D| 4|
| 9| P| 5|
| 10| R| 6|
| 11| R| 6|
| 12| D| 7|
+---+---+---+
获取领先值(方法1)
df = df.withColumn("lead", F.collect_set("now").over(W.orderBy("id").rangeBetween(1, 1)))
df = df.select("Sl", "now", F.explode_outer("lead").alias("lead"))
df.show()
+---+---+----+
| Sl|now|lead|
+---+---+----+
| 0| D| R|
| 1| D| R|
| 2| D| R|
| 3| R| C|
| 4| R| C|
| 5| C| D|
| 6| C| D|
| 7| C| D|
| 8| D| P|
| 9| P| R|
| 10| R| D|
| 11| R| D|
| 12| D|null|
+---+---+----+
获取领先值(方法2)
df2 = df.select(F.col("now").alias("lead"), (F.col("id") - 1).alias("id")).distinct()
df1 = df.join(df2, how="left", on="id")
df1.select("Sl", "now", "lead").show()
+---+---+----+
| Sl|now|lead|
+---+---+----+
| 0| D| R|
| 1| D| R|
| 2| D| R|
| 3| R| C|
| 4| R| C|
| 5| C| D|
| 6| C| D|
| 7| C| D|
| 8| D| P|
| 9| P| R|
| 10| R| D|
| 11| R| D|
| 12| D|null|
+---+---+----+
标题>标题>