向前和向后填充PySpark中的每个组



如何在PySpark中为每个组进行向前和向后填充?例如,如果我们使用列id对数据进行分组,并使用列order对缺少数据的值进行排序:

df = spark.createDataFrame([
('a', 1.0, 1.0),
('b', 1.0, 2.0),
('a', 2.0, float("nan")),
('b', 2.0, float("nan")),
('a', 3.0, 3.0),
('b', 3.0, 4.0)],
["id", "order", "values"])
+---+-----+------+
| id|order|values|
+---+-----+------+
|  a|  1.0|   1.0|
|  b|  1.0|   2.0|
|  a|  2.0|   NaN|
|  b|  2.0|   NaN|
|  a|  3.0|   3.0|
|  b|  3.0|   4.0|
+---+-----+------+

正向填充预期结果:

+---+-----+------+
| id|order|values|
+---+-----+------+
|  a|  1.0|   1.0|
|  b|  1.0|   2.0|
|  a|  2.0|   1.0|
|  b|  2.0|   2.0|
|  a|  3.0|   3.0|
|  b|  3.0|   4.0|
+---+-----+------+

反向填充的预期结果:

+---+-----+------+
| id|order|values|
+---+-----+------+
|  a|  1.0|   1.0|
|  b|  1.0|   2.0|
|  a|  2.0|   3.0|
|  b|  2.0|   4.0|
|  a|  3.0|   3.0|
|  b|  3.0|   4.0|
+---+-----+------+

尝试先用nulls替换nan值,然后使用coalesce结合lastfirst函数(与ignoreNulls设置为true)在窗口上,像这里的例子:

import pyspark.sql.functions as F
ffill_window = "(partition by id order by order rows between unbounded preceding and current row)"
bfill_window = "(partition by id order by order rows between current row and unbounded following)"
(df
.withColumn("values", F.expr("case when isnan(values) then null else values end"))
.withColumn("values_ffill", F.expr(f"coalesce(values, last(values, true) over {ffill_window})"))
.withColumn("values_bfill", F.expr(f"coalesce(values, first(values, true) over {bfill_window})"))
).show()
# +---+-----+------+------------+------------+
# | id|order|values|values_ffill|values_bfill|
# +---+-----+------+------------+------------+
# |  b|  1.0|   2.0|         2.0|         2.0|
# |  b|  2.0|  null|         2.0|         4.0|
# |  b|  3.0|   4.0|         4.0|         4.0|
# |  a|  1.0|   1.0|         1.0|         1.0|
# |  a|  2.0|  null|         1.0|         3.0|
# |  a|  3.0|   3.0|         3.0|         3.0|
# +---+-----+------+------------+------------+