如何在PySpark中为每个组进行向前和向后填充?例如,如果我们使用列id
对数据进行分组,并使用列order
对缺少数据的值进行排序:
df = spark.createDataFrame([
('a', 1.0, 1.0),
('b', 1.0, 2.0),
('a', 2.0, float("nan")),
('b', 2.0, float("nan")),
('a', 3.0, 3.0),
('b', 3.0, 4.0)],
["id", "order", "values"])
+---+-----+------+
| id|order|values|
+---+-----+------+
| a| 1.0| 1.0|
| b| 1.0| 2.0|
| a| 2.0| NaN|
| b| 2.0| NaN|
| a| 3.0| 3.0|
| b| 3.0| 4.0|
+---+-----+------+
正向填充预期结果:
+---+-----+------+
| id|order|values|
+---+-----+------+
| a| 1.0| 1.0|
| b| 1.0| 2.0|
| a| 2.0| 1.0|
| b| 2.0| 2.0|
| a| 3.0| 3.0|
| b| 3.0| 4.0|
+---+-----+------+
反向填充的预期结果:
+---+-----+------+
| id|order|values|
+---+-----+------+
| a| 1.0| 1.0|
| b| 1.0| 2.0|
| a| 2.0| 3.0|
| b| 2.0| 4.0|
| a| 3.0| 3.0|
| b| 3.0| 4.0|
+---+-----+------+
尝试先用null
s替换nan
值,然后使用coalesce
结合last
和first
函数(与ignoreNulls设置为true)在窗口上,像这里的例子:
import pyspark.sql.functions as F
ffill_window = "(partition by id order by order rows between unbounded preceding and current row)"
bfill_window = "(partition by id order by order rows between current row and unbounded following)"
(df
.withColumn("values", F.expr("case when isnan(values) then null else values end"))
.withColumn("values_ffill", F.expr(f"coalesce(values, last(values, true) over {ffill_window})"))
.withColumn("values_bfill", F.expr(f"coalesce(values, first(values, true) over {bfill_window})"))
).show()
# +---+-----+------+------------+------------+
# | id|order|values|values_ffill|values_bfill|
# +---+-----+------+------------+------------+
# | b| 1.0| 2.0| 2.0| 2.0|
# | b| 2.0| null| 2.0| 4.0|
# | b| 3.0| 4.0| 4.0| 4.0|
# | a| 1.0| 1.0| 1.0| 1.0|
# | a| 2.0| null| 1.0| 3.0|
# | a| 3.0| 3.0| 3.0| 3.0|
# +---+-----+------+------------+------------+