需要在Spark SQL lag
函数中添加一些条件。
我的数据中有 ID 和日期,我想获得最接近的滞后日期,该日期不为空。
输入:
编号 | 日期 |
---|---|
ER1 | 2018年01月19 | 日
ER1 | 零 |
ER1 | 2018年02月10 | 日
ER2 | 2018年11月11日 |
ER2 | 零 |
ER2 | 零 |
ER2 | 零 |
标准lag()
函数有一个ignore nulls
选项:
select Id, date,
lag(date ignore nulls) over (PARTITION BY id order by date) as last_date
from mytable;
但并非所有数据库都支持此功能。 您可以使用子查询来模拟它:
select Id, date,
min(date) over (partition by id, grp order by date) as last_date
from (select t.*,
count(date) over (partition by id order by date) as grp
from mytable t
) t
首先,创建一个滞后值的列,
然后使用 last
isIgnoreNull
参数设置为 true
。
SELECT id, order_col, date,
last(last_val, true) over (partition by id order by order_col) last_date
FROM (SELECT *, lag(date) over (partition by id order by order_col) last_val
FROM mytable)
<小时 />在 PySpark 中测试:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('er1', 1, '2018-01-19'),
('er1', 2, None),
('er1', 3, '2018-02-10'),
('er2', 1, '2018-11-11'),
('er2', 2, None),
('er2', 3, None),
('er2', 4, None)],
['id', 'order_col', 'date']
)
df.createOrReplaceTempView("mytable")
spark.sql(
"""
SELECT id, order_col, date,
last(last_val, true) over (partition by id order by order_col) last_date
FROM (SELECT *, lag(date) over (partition by id order by order_col) last_val
FROM mytable)
"""
).show()
# +---+---------+----------+----------+
# | id|order_col| date| last_date|
# +---+---------+----------+----------+
# |er1| 1|2018-01-19| null|
# |er1| 2| null|2018-01-19|
# |er1| 3|2018-02-10|2018-01-19|
# |er2| 1|2018-11-11| null|
# |er2| 2| null|2018-11-11|
# |er2| 3| null|2018-11-11|
# |er2| 4| null|2018-11-11|
# +---+---------+----------+----------+
<小时 />PySpark 而不是 SQL:
w = W.partitionBy("id").orderBy("order_col")
df = df.withColumn("last_val", F.lag("date").over(w))
df = df.select(
"id", "order_col", "date",
F.last("last_val", True).over(w).alias("last_date")
)