我有一个pyspark数据帧,看起来像这样:
key key2 category ip_address
1 a desktop 111
1 a desktop 222
1 b desktop 333
1 c mobile 444
2 d cell 555
我想groupBy
key
得到唯一ip_addr
的总数,以及唯一key_2
的总数,然后是每个category
贡献的唯一ip_address
的数量(假设category
中的值是恒定的,所以category
的值只能是[桌面、移动、蜂窝](。
因此,我正在寻找这样一个结果数据帧:
key num_ips num_key2 num_desktop num_mobile num_cell
1 4 3 3 1 0
2 1 1 0 0 0
我一直在尝试这样的代码,但num_desktop
、num_mobile
、num_cell
的代码不太正确。
import pyspark.sql.functions as F
df_agg = df.groupBy('key1')
.agg(F.countDistinct('ip_addr').alias('num_ips'),
F.countDistinct('key_2').alias('num_key2'),
F.countDistinct('ip_addr').where(F.col('category')=='desktop').alias('num_desktop'),
F.countDistinct('ip_addr').where(F.col('category')=='mobile').alias('num_mobile'),
F.countDistinct('ip_addr').where(F.col('category')=='cell').alias('num_cell')))
我是否必须执行某种类型的嵌套groupBy
,或者可能执行Window
函数?非常感谢您的帮助!
我不得不拆分数据帧,并将它们连接回桌面、移动和单元格计数
df1 = df.groupBy('key')
.agg(F.countDistinct('ip_address').alias('num_ips'),
F.countDistinct('key2').alias('num_key2'))
de = df.filter(col("category")=="desktop").groupBy('key')
.agg(F.countDistinct('ip_address').alias('num_desktop')).withColumnRenamed("key", "key1")
dm = df.filter(col("category")=="mobile").groupBy('key')
.agg(F.countDistinct('ip_address').alias('num_mobile')).withColumnRenamed("key", "key1")
dc = df.filter(col("category")=="cell").groupBy('key')
.agg(F.countDistinct('ip_address').alias('num_cell')).withColumnRenamed("key", "key1")
join_df = df1.join(de, (df.key == de.key1), "left").drop("key1")
.join(dm, (df.key == dm.key1), "left").drop("key1")
.join(dc, (df.key == dc.key1), "left").drop("key1")
.fillna(0).drop('category', 'ip_address')
输出:
+---+-------+--------+-----------+----------+--------+
|key|num_ips|num_key2|num_desktop|num_mobile|num_cell|
+---+-------+--------+-----------+----------+--------+
| 1| 4| 3| 3| 1| 0|
| 2| 1| 1| 0| 0| 1|
+---+-------+--------+-----------+----------+--------+