我有两个表,我们称它们为用户、事件
users: [user_id, activity_date]
[123, 4-28-2020 ]
[456, 4-27-2020 ]
events: [user_id, event_date]
[123, 5-28-2020 ]
[456, 4-27-2020 ]
[456, 4-25-2020 ]
[456, 4-30-2020 ]
[456, 1-30-2020 ]
[123, 1-28-2020 ]
我想得到一个汇总表,它为每个用户显示在users.activity_date.中存储的值之前事件表中的事件计数
因此,上面的例子将产生:
[user_id, total]
[123, 1 ]
[456, 3 ]
我尝试使用相关查询并在红移上执行此操作,但它并不完整(第一个表中有数百万条记录,第二个表中有数千万条记录(。。。所以我的想法是使用map reduce。。。但我不知道从哪里开始。我能读懂pyspark中的表格,这就是我被卡住的地方。
您只需要一个join
,就可以找到event_date
是否大于activity_date
和sum
# create data frames
events_df = spark.createDataFrame(
[
("123","5-28-2020"),
("456","4-27-2020"),
("456","4-25-2020"),
("456","4-30-2020"),
("456","1-30-2020"),
("123","1-28-2020")],
("user_id","event_date"))
events_df.show()
+-------+----------+
|user_id|event_date|
+-------+----------+
| 123| 5-28-2020|
| 456| 4-27-2020|
| 456| 4-25-2020|
| 456| 4-30-2020|
| 456| 1-30-2020|
| 123| 1-28-2020|
+-------+----------+
users_df = spark.createDataFrame(
[
("123","4-28-2020"),
("456","4-27-2020")],
("user_id","activity_date"))
users_df.show()
+-------+-------------+
|user_id|activity_date|
+-------+-------------+
| 123| 4-28-2020|
| 456| 4-27-2020|
+-------+-------------+
# Import functions
import pyspark.sql.functions as f
# Join both data frames on user_id
df = events_df.join(f.broadcast(users_df), events_df.user_id == users_df.user_id, how='left_outer').select(events_df['user_id'], events_df['event_date'], users_df['activity_date'])
df.show()
+-------+----------+-------------+
|user_id|event_date|activity_date|
+-------+----------+-------------+
| 123| 5-28-2020| 4-28-2020|
| 456| 4-27-2020| 4-27-2020|
| 456| 4-25-2020| 4-27-2020|
| 456| 4-30-2020| 4-27-2020|
| 456| 1-30-2020| 4-27-2020|
| 123| 1-28-2020| 4-28-2020|
+-------+----------+-------------+
# find if event_date greater than activity_date if yes then assign zero else 1
df1 = df.withColumn('active', f.when(f.col('event_date') > f.col('activity_date'), 0).otherwise(f.lit(1)))
df1.show()
+-------+----------+-------------+------+
|user_id|event_date|activity_date|active|
+-------+----------+-------------+------+
| 123| 5-28-2020| 4-28-2020| 0|
| 456| 4-27-2020| 4-27-2020| 1|
| 456| 4-25-2020| 4-27-2020| 1|
| 456| 4-30-2020| 4-27-2020| 0|
| 456| 1-30-2020| 4-27-2020| 1|
| 123| 1-28-2020| 4-28-2020| 1|
+-------+----------+-------------+------+
# then group by and sum
df2 = df1.groupby("user_id").agg(f.sum('active').alias('total'))
df2.show()
+-------+-----+
|user_id|total|
+-------+-----+
| 456| 3|
| 123| 1|
+-------+-----+
我添加了一个broadcast
联接,正如您所说的,一个表有million
记录,另一个有tens of millions
记录。
您也可以将df
和df1
组合为如下
df = events_df.join(f.broadcast(users_df), events_df.user_id == users_df.user_id, how='left_outer').select(events_df['user_id'], events_df['event_date'], users_df['activity_date']).withColumn('active', f.when(f.col('event_date') > f.col('activity_date'), 0).otherwise(f.lit(1)))