我有一个Spark数据帧,需要在其中创建一个窗口分区列("desired_output"(。此列需要反填充和非null值。
我希望回填第一个非null值,如果不存在非null,请继续保持最后一个非null的值。
以下是一些用例及其所需的输出:
columns = ['user_id', 'date', 'date2', 'desired_outcome']
data = [
('1','2022-01-01', None, '2022-01-05'),
('1','2022-01-02', None, '2022-01-05'),
('1','2022-01-03', None, '2022-01-05'),
('1','2022-01-04', None, '2022-01-05'),
('1','2022-01-05', '2022-01-05', '2022-01-05'),
('1','2022-01-06', None, '2022-01-05'),
('1','2022-01-07', None, '2022-01-05'),
('2','2022-01-01', None, '2022-01-05'),
('2','2022-01-02', None, '2022-01-05'),
('2','2022-01-03', None, '2022-01-05'),
('2','2022-01-04', None, '2022-01-05'),
('2','2022-01-05', '2022-01-05', '2022-01-05'),
('2','2022-01-06', None, '2022-01-09'),
('2','2022-01-07', None, '2022-01-09'),
('2','2022-01-08', None, '2022-01-09'),
('2','2022-01-09', '2022-01-09', '2022-01-09'),
('2','2022-01-10', None, '2022-01-09'),
('2','2022-01-11', None, '2022-01-09'),
('2','2022-01-12', None, '2022-01-09'),
('3','2022-01-01', '2022-01-01', '2022-01-01'),
('3','2022-01-02', None, '2022-01-05'),
('3','2022-01-03', None, '2022-01-05'),
('3','2022-01-04', None, '2022-01-05'),
('3','2022-01-05', '2022-01-05', '2022-01-05'),
('3','2022-01-06', None, '2022-01-05'),
('3','2022-01-07', None, '2022-01-05'),
('3','2022-01-08', None, '2022-01-05'),
('3','2022-01-09', None, '2022-01-05'),
('3','2022-01-10', None, '2022-01-05'),
('3','2022-01-11', None, '2022-01-05'),
('3','2022-01-12', None, '2022-01-05')]
sample_df = spark.createDataFrame(data, columns)
我尝试了以下解决方案,但无法完全得到返回为"的结果;期望输出";柱
window = (
Window
.partitionBy('user_id')
.orderBy('date')
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
sample_df = sample_df.withColumn('backfill', last('date2', ignorenulls=True).over(window))
您可以在两个窗口上执行此操作,一个向前看并返回第一个非null值,另一个向后看并返回最后一个非null。
from pyspark.sql import functions as F, Window as W
w_following = W.partitionBy('user_id').orderBy('date').rowsBetween(0, W.unboundedFollowing)
w_preceding = W.partitionBy('user_id').orderBy('date').rowsBetween(W.unboundedPreceding, 0)
sample_df = sample_df.withColumn(
'date3',
F.coalesce(
F.first('date2', True).over(w_following),
F.last('date2', True).over(w_preceding)
)
)
结果:
sample_df.show(99)
# +-------+----------+----------+---------------+----------+
# |user_id| date| date2|desired_outcome| date3|
# +-------+----------+----------+---------------+----------+
# | 1|2022-01-01| null| 2022-01-05|2022-01-05|
# | 1|2022-01-02| null| 2022-01-05|2022-01-05|
# | 1|2022-01-03| null| 2022-01-05|2022-01-05|
# | 1|2022-01-04| null| 2022-01-05|2022-01-05|
# | 1|2022-01-05|2022-01-05| 2022-01-05|2022-01-05|
# | 1|2022-01-06| null| 2022-01-05|2022-01-05|
# | 1|2022-01-07| null| 2022-01-05|2022-01-05|
# | 2|2022-01-01| null| 2022-01-05|2022-01-05|
# | 2|2022-01-02| null| 2022-01-05|2022-01-05|
# | 2|2022-01-03| null| 2022-01-05|2022-01-05|
# | 2|2022-01-04| null| 2022-01-05|2022-01-05|
# | 2|2022-01-05|2022-01-05| 2022-01-05|2022-01-05|
# | 2|2022-01-06| null| 2022-01-09|2022-01-09|
# | 2|2022-01-07| null| 2022-01-09|2022-01-09|
# | 2|2022-01-08| null| 2022-01-09|2022-01-09|
# | 2|2022-01-09|2022-01-09| 2022-01-09|2022-01-09|
# | 2|2022-01-10| null| 2022-01-09|2022-01-09|
# | 2|2022-01-11| null| 2022-01-09|2022-01-09|
# | 2|2022-01-12| null| 2022-01-09|2022-01-09|
# | 3|2022-01-01|2022-01-01| 2022-01-01|2022-01-01|
# | 3|2022-01-02| null| 2022-01-05|2022-01-05|
# | 3|2022-01-03| null| 2022-01-05|2022-01-05|
# | 3|2022-01-04| null| 2022-01-05|2022-01-05|
# | 3|2022-01-05|2022-01-05| 2022-01-05|2022-01-05|
# | 3|2022-01-06| null| 2022-01-05|2022-01-05|
# | 3|2022-01-07| null| 2022-01-05|2022-01-05|
# | 3|2022-01-08| null| 2022-01-05|2022-01-05|
# | 3|2022-01-09| null| 2022-01-05|2022-01-05|
# | 3|2022-01-10| null| 2022-01-05|2022-01-05|
# | 3|2022-01-11| null| 2022-01-05|2022-01-05|
# | 3|2022-01-12| null| 2022-01-05|2022-01-05|
# +-------+----------+----------+---------------+----------+