我有一个这样的数据框架:
+-----+----+----+--------+
|index|name| Num|solution|
+-----+----+----+--------+
| 0| a|1000| true|
| 1| a|2000| true|
| 2| a| 300| false|
| 3| a| 400| true|
| 4| a|2100| true|
| 5| a|2200| true|
+-----+----+----+--------+
我现在想要更新我的解决方案列。如果第一次达到或超过"drop"之前的值(Num
)(在这里,drop"之前的值是2000),我想将所有bool从"drop"设置为False。直到那个点之后。所以预期的结果是:
+-----+----+----+---------------+
|index|name| Num|solution_update|
+-----+----+----+---------------+
| 0| a|1000| true|
| 1| a|2000| true|
| 2| a| 300| false|
| 3| a| 400| false|
| 4| a|2100| false|
| 5| a|2200| true|
+-----+----+----+---------------+
我觉得我错过了如何解决这个问题的总体思路:-
我可以检测掉前一行的值:
my_window = Window.partitionBy('name').orderBy(F.col('index'))
df= df.withColumn('lag1', F.lag(F.col('Num'), -1)
.over(my_window).cast('bigint'))
df= df.withColumn('help',
(F.when((F.col('lag1'))
< (F.col('Num')), False)))
+-----+----+----+--------+----+-----+
|index|name| Num|solution|lag1| help|
+-----+----+----+--------+----+-----+
| 0| a|1000| true|2000| null|
| 1| a|2000| true| 300|false|
| 2| a| 300| false| 400| null|
| 3| a| 400| true|2100| null|
| 4| a|2100| true|2200| null|
| 5| a|2200| true|null| null|
+-----+----+----+--------+----+-----+
但是现在我不知道如何搜索"第一个值等于或大于"比
df.where(F.col('help')==False)['Num']
有人能帮忙吗?
这并不容易。我是这样做的。希望这些列是足够自解释的:)但是如果你不清楚任何列的含义,请询问。
from pyspark.sql import functions as F, Window
my_window = Window.partitionBy('name').orderBy(F.col('index'))
df2 = df.withColumn(
'drop',
F.when(F.col('Num') < F.lag('Num').over(my_window), F.lag('Num').over(my_window))
).withColumn(
'num_before_drop',
F.last('drop', ignorenulls=True).over(my_window)
).withColumn(
'surpass',
F.col('Num') > F.col('num_before_drop')
).withColumn(
'first_surpass',
F.col('surpass') & ~F.lag('surpass').over(my_window)
).withColumn(
'solution_update',
F.when(~F.col('surpass') | F.col('first_surpass'), F.lit(False))
.otherwise(F.col('solution'))
)
df2.show()
+-----+----+----+--------+----+---------------+-------+-------------+---------------+
|index|name| Num|solution|drop|num_before_drop|surpass|first_surpass|solution_update|
+-----+----+----+--------+----+---------------+-------+-------------+---------------+
| 0| a|1000| true|null| null| null| null| true|
| 1| a|2000| true|null| null| null| null| true|
| 2| a| 300| false|2000| 2000| false| false| false|
| 3| a| 400| true|null| 2000| false| false| false|
| 4| a|2100| true|null| 2000| true| true| false|
| 5| a|2200| true|null| 2000| true| false| true|
+-----+----+----+--------+----+---------------+-------+-------------+---------------+
还有另一种方法:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
win = Window.partitionBy('name').orderBy('index')
# Get "largest so far"
df = df.withColumn('max_num', F.max('Num').over(win))
df = df.withColumn('running_max', F.expr('max_num > Num'))
# Now lag the largest so far and form a combined indicator
df = df.withColumn('lag_running_max', F.lag('running_max').over(win)).na.fill(False)
df = df.withColumn('combined_indicator', F.expr("running_max or lag_running_max"))
# update solution column
df = df.withColumn('updated_solution', F.when(F.col("combined_indicator"), False).otherwise(F.col("solution")))