如何编写pyspark映射reduce来计算日期之前的事件数



我有两个表,我们称它们为用户、事件

users: [user_id, activity_date]
[123,     4-28-2020    ]
[456,     4-27-2020    ]
events: [user_id, event_date]
[123,     5-28-2020    ]
[456,     4-27-2020    ]
[456,     4-25-2020    ]
[456,     4-30-2020    ]
[456,     1-30-2020    ]
[123,     1-28-2020    ]

我想得到一个汇总表,它为每个用户显示在users.activity_date.中存储的值之前事件表中的事件计数

因此,上面的例子将产生:

[user_id, total]
[123,       1  ]
[456,       3  ]    

我尝试使用相关查询并在红移上执行此操作,但它并不完整(第一个表中有数百万条记录,第二个表中有数千万条记录(。。。所以我的想法是使用map reduce。。。但我不知道从哪里开始。我能读懂pyspark中的表格,这就是我被卡住的地方。

您只需要一个join,就可以找到event_date是否大于activity_datesum

# create data frames
events_df = spark.createDataFrame(
[
("123","5-28-2020"),
("456","4-27-2020"),
("456","4-25-2020"),
("456","4-30-2020"),
("456","1-30-2020"),
("123","1-28-2020")], 
("user_id","event_date"))
events_df.show()
+-------+----------+
|user_id|event_date|
+-------+----------+
|    123| 5-28-2020|
|    456| 4-27-2020|
|    456| 4-25-2020|
|    456| 4-30-2020|
|    456| 1-30-2020|
|    123| 1-28-2020|
+-------+----------+

users_df = spark.createDataFrame(
[
("123","4-28-2020"),
("456","4-27-2020")], 
("user_id","activity_date"))
users_df.show()
+-------+-------------+
|user_id|activity_date|
+-------+-------------+
|    123|    4-28-2020|
|    456|    4-27-2020|
+-------+-------------+
# Import functions
import pyspark.sql.functions as f
# Join both data frames on user_id
df = events_df.join(f.broadcast(users_df), events_df.user_id == users_df.user_id, how='left_outer').select(events_df['user_id'], events_df['event_date'], users_df['activity_date'])
df.show()
+-------+----------+-------------+
|user_id|event_date|activity_date|
+-------+----------+-------------+
|    123| 5-28-2020|    4-28-2020|
|    456| 4-27-2020|    4-27-2020|
|    456| 4-25-2020|    4-27-2020|
|    456| 4-30-2020|    4-27-2020|
|    456| 1-30-2020|    4-27-2020|
|    123| 1-28-2020|    4-28-2020|
+-------+----------+-------------+
# find if event_date greater than activity_date if yes then assign zero else 1
df1 = df.withColumn('active', f.when(f.col('event_date') > f.col('activity_date'), 0).otherwise(f.lit(1)))
df1.show()
+-------+----------+-------------+------+
|user_id|event_date|activity_date|active|
+-------+----------+-------------+------+
|    123| 5-28-2020|    4-28-2020|     0|
|    456| 4-27-2020|    4-27-2020|     1|
|    456| 4-25-2020|    4-27-2020|     1|
|    456| 4-30-2020|    4-27-2020|     0|
|    456| 1-30-2020|    4-27-2020|     1|
|    123| 1-28-2020|    4-28-2020|     1|
+-------+----------+-------------+------+
# then group by and sum
df2 = df1.groupby("user_id").agg(f.sum('active').alias('total'))
df2.show()
+-------+-----+
|user_id|total|
+-------+-----+
|    456|    3|
|    123|    1|
+-------+-----+

我添加了一个broadcast联接,正如您所说的,一个表有million记录,另一个有tens of millions记录。

您也可以将dfdf1组合为如下

df = events_df.join(f.broadcast(users_df), events_df.user_id == users_df.user_id, how='left_outer').select(events_df['user_id'], events_df['event_date'], users_df['activity_date']).withColumn('active', f.when(f.col('event_date') > f.col('activity_date'), 0).otherwise(f.lit(1)))

相关内容

  • 没有找到相关文章