我有下面的spark dataframe/dataset.
Column_A Column_B
2020-12-31 1
2020-11-02 2
2020-10-01 3
2021-02-01 4
2021-01-05 5
2021-02-10 6
2021-02-11 7
2021-02-26 8
2021-03-01 9
我必须过滤并只保留从脚本执行之日起前一个月的记录。
假设我在2021年1月1日执行spark程序,输出数据帧应该只有2020年12月的记录。如果我今天(3月1日)执行它,那么它应该返回2021年2月以来的所有行。
预期输出:
Column_A Column_B
2021-02-01 4
2021-02-10 6
2021-02-11 7
2021-02-26 8
如何在pyspark中实现
您可以使用current_date
和add_months
函数进行过滤:
from pyspark.sql import functions as F
df1 = df.filter(
(F.month(F.col("Column_A")) == F.month(F.add_months(F.current_date(), -1))) &
(F.year(F.col("Column_A")) == F.year(F.add_months(F.current_date(), -1)))
)
df1.show()
#+----------+--------+
#| Column_A|Column_B|
#+----------+--------+
#|2021-02-01| 4|
#|2021-02-10| 6|
#|2021-02-11| 7|
#|2021-02-26| 8|
#+----------+--------+
使用date_format
:
df1 = df.filter(
F.date_format(F.col("Column_A"), "yyyyMM") == F.date_format(F.add_months(F.current_date(), -1), "yyyyMM")
)
使用date_trunc
:
df1 = df.filter(
F.date_trunc("month", F.col("Column_A")) == F.date_trunc("month", F.add_months(F.current_date(), -1))
)