查找再次到达值的行



我有一个这样的数据框架:

+-----+----+----+--------+
|index|name| Num|solution|
+-----+----+----+--------+
|    0|   a|1000|    true|
|    1|   a|2000|    true|
|    2|   a| 300|   false|
|    3|   a| 400|    true|
|    4|   a|2100|    true|
|    5|   a|2200|    true|
+-----+----+----+--------+

我现在想要更新我的解决方案列。如果第一次达到或超过"drop"之前的值(Num)(在这里,drop"之前的值是2000),我想将所有bool从"drop"设置为False。直到那个点之后。所以预期的结果是:

+-----+----+----+---------------+
|index|name| Num|solution_update|
+-----+----+----+---------------+
|    0|   a|1000|           true|
|    1|   a|2000|           true|
|    2|   a| 300|          false|
|    3|   a| 400|          false|
|    4|   a|2100|          false|
|    5|   a|2200|           true|
+-----+----+----+---------------+

我觉得我错过了如何解决这个问题的总体思路:-

我可以检测掉前一行的值:

my_window = Window.partitionBy('name').orderBy(F.col('index'))
df= df.withColumn('lag1', F.lag(F.col('Num'), -1)
.over(my_window).cast('bigint'))
df= df.withColumn('help',
(F.when((F.col('lag1'))
< (F.col('Num')), False)))
+-----+----+----+--------+----+-----+
|index|name| Num|solution|lag1| help|
+-----+----+----+--------+----+-----+
|    0|   a|1000|    true|2000| null|
|    1|   a|2000|    true| 300|false|
|    2|   a| 300|   false| 400| null|
|    3|   a| 400|    true|2100| null|
|    4|   a|2100|    true|2200| null|
|    5|   a|2200|    true|null| null|
+-----+----+----+--------+----+-----+

但是现在我不知道如何搜索"第一个值等于或大于"比

df.where(F.col('help')==False)['Num']

有人能帮忙吗?

这并不容易。我是这样做的。希望这些列是足够自解释的:)但是如果你不清楚任何列的含义,请询问。

from pyspark.sql import functions as F, Window
my_window = Window.partitionBy('name').orderBy(F.col('index'))
df2 = df.withColumn(
'drop',
F.when(F.col('Num') < F.lag('Num').over(my_window), F.lag('Num').over(my_window))
).withColumn(
'num_before_drop',
F.last('drop', ignorenulls=True).over(my_window)
).withColumn(
'surpass',
F.col('Num') > F.col('num_before_drop')
).withColumn(
'first_surpass',
F.col('surpass') & ~F.lag('surpass').over(my_window)
).withColumn(
'solution_update',
F.when(~F.col('surpass') | F.col('first_surpass'), F.lit(False))
.otherwise(F.col('solution'))
)
df2.show()
+-----+----+----+--------+----+---------------+-------+-------------+---------------+
|index|name| Num|solution|drop|num_before_drop|surpass|first_surpass|solution_update|
+-----+----+----+--------+----+---------------+-------+-------------+---------------+
|    0|   a|1000|    true|null|           null|   null|         null|           true|
|    1|   a|2000|    true|null|           null|   null|         null|           true|
|    2|   a| 300|   false|2000|           2000|  false|        false|          false|
|    3|   a| 400|    true|null|           2000|  false|        false|          false|
|    4|   a|2100|    true|null|           2000|   true|         true|          false|
|    5|   a|2200|    true|null|           2000|   true|        false|           true|
+-----+----+----+--------+----+---------------+-------+-------------+---------------+

还有另一种方法:

from pyspark.sql.window import Window
from pyspark.sql import functions as F
win = Window.partitionBy('name').orderBy('index')

# Get "largest so far"
df = df.withColumn('max_num', F.max('Num').over(win))
df = df.withColumn('running_max', F.expr('max_num > Num'))
# Now lag the largest so far and form a combined indicator
df = df.withColumn('lag_running_max', F.lag('running_max').over(win)).na.fill(False)
df =  df.withColumn('combined_indicator', F.expr("running_max or lag_running_max"))
# update solution column
df = df.withColumn('updated_solution', F.when(F.col("combined_indicator"), False).otherwise(F.col("solution")))

相关内容

  • 没有找到相关文章

最新更新