如何在pyspark函数内计算不同日期范围的id ?



我有一个pyspark数据框架,名称为h2_df,列为" parsed_date ";(dtype: date)和"id"(dtype: bigint)

+-------+-----------+
|     id|parsed_date|
+-------+-----------+
|1471783| 2017-12-18|
|1471885| 2017-12-18|
|1472928| 2017-12-19|
|1476917| 2017-12-21|
|1477469| 2017-12-22|
|1478190| 2017-12-22|
|1478570| 2017-12-22|
|1481415| 2017-12-25|
|1472592| 2017-12-19|
|1474023| 2017-12-20|
+-------+-----------+

我想创建一个函数,在这里我传递一个日期,并在函数内,我想计算id(从在函数外创建的dataframe h2_df)为位于日期范围之间的每个日期。范围1为(day, day+t),范围2为(day+t, day+(2*t), t =5。

我是pyspark的新手,所以下面的代码当然是模糊的,不能工作:

def hypo_2(day):
t = td(days=5)
start_date_before = day 
end_date_before = day+t

start_date_after = day+t
end_date_after = day+(2*t)

cond_1 = (h2_df["parsed_date"] > start_date_before) & (h2_df["parsed_date"] < end_date_before)
cond_2 = (h2_df["parsed_date"] > start_date_after) & (h2_df["parsed_date"] < end_date_after)

df_1 = h2_df.withColumn("count_before", when(cond_1, h2_df.groupBy("parsed_date").agg(count("id"))))
df_2 = h2_df.withColumn("count_after", when(cond_2, h2_df.groupBy("parsed_date").agg(count("id"))))

我想要一个函数,我可以传递任何日期,然后它给我关于日期的每个id的计数,但日期应该只在范围内。每次我调用函数,它都会取日期->创建日期->为每个日期(和每个范围)创建2个数据帧,每个数据帧的计数为id ->返回2个数据帧,其中包含该范围内每个id的计数。

例如:调用hypo_2(2017,12,18)时,函数应该返回df_1和df_2。df_1的期望输出如下所示:

+-------+-----------+------------+
|     id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18|           2|
|1471885| 2017-12-18|            |
|1472928| 2017-12-19|           1|
|1476917| 2017-12-21|           1|
|1477469| 2017-12-22|           3|
|1478190| 2017-12-22|            |
|1478570| 2017-12-22|            |
+-------+-----------+------------+

请帮助。

您可以使用过滤器来选择感兴趣的时间间隔,并为每个parsed_date添加一列count:

from pyspark.sql import functions as F, Window
def hypo_2(df, day, t):
"""
Example usage: df_list = hypo_2(df, '2017-12-18', 5)
Returns a list of 2 dataframes.
"""
df1 = (df.filter(f"parsed_date between '{day}' and '{day}' + interval {t} days")
.withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
.orderBy('parsed_date')
)
df2 = (df.filter(f"parsed_date between '{day}' + interval {t} days and '{day}' + interval {t*2} days")
.withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
.orderBy('parsed_date')
)
return [df1, df2]

最新更新