我有一个pyspark数据框架,列parsed_date
(dtype: date)和id
(dtype: bigint)如下所示:
+-------+-----------+
| id|parsed_date|
+-------+-----------+
|1471783| 2017-12-18|
|1471885| 2017-12-18|
|1472928| 2017-12-19|
|1476917| 2017-12-19|
|1477469| 2017-12-21|
|1478190| 2017-12-21|
|1478570| 2017-12-19|
|1481415| 2017-12-21|
|1472592| 2017-12-20|
|1474023| 2017-12-22|
|1474029| 2017-12-22|
+-------+-----------+
我有一个如下所示的函数。目的是传递日期(day)和t (no)。天)。在df1中,id的计数范围为(day-t, day);在df2中,id的计数范围为(day, day+t)。
from pyspark.sql import functions as F, Window
def hypo_1(df, day, t):
df1 = (df.filter(f"parsed_date between '{day}' - interval {t} days and '{day}'")
.withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
.orderBy('parsed_date')
)
df2 = (df.filter(f"parsed_date between '{day}' and '{day}' + interval {t} days")
.withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
.orderBy('parsed_date')
)
return [df1, df2]
函数返回两个数据帧:
示例:hypo_1(df, '2017-12-20', 2)
df1
+-----------+-------+------------+
|parsed_date| id|count_before|
+-----------+-------+------------+
| 2017-12-20|1471783| 1|
+-----------+-------+------------+
df2
+-----------+-------+-----------+
|parsed_date| id|count_after|
+-----------+-------+-----------+
| 2017-12-20|1472592| 1|
| 2017-12-21|1477469| 3|
| 2017-12-22|1474029| 2|
+-----------+-------+-----------+
问题:
df1的日期间隔看起来不对
我通过的日期(2017-12-20)的id不应该被计算,这在df1和df2中都发生了->
+-----------+-------+-----------+ |parsed_date| id|count_after| +-----------+-------+-----------+ | 2017-12-20|1472592| 1|
预期输出:
示例:hypo_1(df, '2017-12-20', 2)
df1:
+-------+-----------+------------+
| id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18| 2|
|1478570| 2017-12-19| 3|
+-------+-----------+------------+
df2:
+-------+-----------+------------+
| id|parsed_date| count_after|
+-------+-----------+------------+
|1477469| 2017-12-21| 3|
|1474023| 2017-12-22| 2|
+-------+-----------+------------+
请帮助。
稍微改变一下过滤条件(添加- interval 1 day
或+ interval 1 day
):
from pyspark.sql import functions as F, Window
def hypo_1(df, day, t):
df1 = (df.filter(f"parsed_date between '{day}' - interval {t} days and '{day}' - interval 1 day")
.withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
.orderBy('parsed_date')
)
df2 = (df.filter(f"parsed_date between '{day}' + interval 1 day and '{day}' + interval {t} days")
.withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
.orderBy('parsed_date')
)
return [df1, df2]
df1, df2 = hypo_1(df, '2017-12-20', 2)
df1.show()
+-------+-----------+------------+
| id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18| 2|
|1471885| 2017-12-18| 2|
|1472928| 2017-12-19| 3|
|1476917| 2017-12-19| 3|
|1478570| 2017-12-19| 3|
+-------+-----------+------------+
df2.show()
+-------+-----------+-----------+
| id|parsed_date|count_after|
+-------+-----------+-----------+
|1481415| 2017-12-21| 3|
|1478190| 2017-12-21| 3|
|1477469| 2017-12-21| 3|
|1474023| 2017-12-22| 2|
|1474029| 2017-12-22| 2|
+-------+-----------+-----------+
如果你想要得到你想要的输出,你可以删除重复项,例如
df1 = df1.dropDuplicates(['parsed_date', 'count_after'])