我有一个关于如何在pyspark具有相同键值的列中添加特定列值的问题



我正在学习spark。是否有一种方法来创建和返回一个数据帧,通过组合行之间的播放时间,其中userId和movieId在表中的列之间重复?谢谢你!输入图片描述

虽然我无法理解结果数据,但通过查看附件的截图,我想出了这个解决方案。

在这里,我假设我必须首先从最终结果聚合播放时间的重复记录中选择记录。

我已经创建了数据集的两个数据框dfdf1,并将它们连接起来。

>>> windowSpec  = Window.partitionBy("movieid").orderBy("userid")
>>> df1=df.withColumn("row_number",row_number().over(windowSpec))
>>> df1.show()
+------+-------+-----+---------+--------+--------+----------+
|userid|movieid|score|scoretype|playtime|duration|row_number|
+------+-------+-----+---------+--------+--------+----------+
|     3|    306|    5|        1|       0|       0|         1|
|     2|    202|    2|        2|    1800|    3600|         1|
|     7|    100|    0|        1|    3600|    3600|         1|
|     1|    102|    5|        1|       0|       0|         1|
|     1|    102|    0|        2|    1800|    3600|         2|
|     2|    204|    3|        1|       0|       0|         1|
|     1|    106|    3|        2|    3600|    3600|         1|
|     7|    700|    0|        2|     100|    3600|         1|
|     7|    700|    0|        2|     500|    3600|         2|
|     7|    700|    0|        2|     600|    3600|         3|
+------+-------+-----+---------+--------+--------+----------+
>>> df1=df1.where(df1.row_number==1)
>>> df1.show()
+------+-------+-----+---------+--------+--------+----------+
|userid|movieid|score|scoretype|playtime|duration|row_number|
+------+-------+-----+---------+--------+--------+----------+
|     3|    306|    5|        1|       0|       0|         1|
|     2|    202|    2|        2|    1800|    3600|         1|
|     7|    100|    0|        1|    3600|    3600|         1|
|     1|    102|    5|        1|       0|       0|         1|
|     2|    204|    3|        1|       0|       0|         1|
|     1|    106|    3|        2|    3600|    3600|         1|
|     7|    700|    0|        2|     100|    3600|         1|
+------+-------+-----+---------+--------+--------+----------+
>>> df=df.groupBy("userid","movieid").agg(sum("playtime"))
>>> df.show()
+------+-------+-------------+
|userid|movieid|sum(playtime)|
+------+-------+-------------+
|     1|    102|         1800|
|     2|    202|         1800|
|     7|    100|         3600|
|     2|    204|            0|
|     3|    306|            0|
|     7|    700|         1200|
|     1|    106|         3600|
+------+-------+-------------+
>>> df=df.withColumnRenamed("userid","useriddf").withColumnRenamed("movieid","movieiddf").withColumnRenamed("sum(playtime)","playtime1")
>>> df.show()
+--------+---------+-------------+
|useriddf|movieiddf|playtime1    |
+--------+---------+-------------+
|       1|      102|         1800|
|       2|      202|         1800|
|       7|      100|         3600|
|       2|      204|            0|
|       3|      306|            0|
|       7|      700|         1200|
|       1|      106|         3600|
+--------+---------+-------------+
>>> df_join=df.join(df1,df.movieiddf == df1.movieid,"inner")
>>> df_join.show()
+--------+---------+---------+------+-------+-----+---------+--------+--------+
|useriddf|movieiddf|playtime1|userid|movieid|score|scoretype|playtime|duration|
+--------+---------+---------+------+-------+-----+---------+--------+--------+
|       3|      306|        0|     3|    306|    5|        1|       0|       0|
|       2|      202|     1800|     2|    202|    2|        2|    1800|    3600|
|       7|      100|     3600|     7|    100|    0|        1|    3600|    3600|
|       1|      102|     1800|     1|    102|    5|        1|       0|       0|
|       2|      204|        0|     2|    204|    3|        1|       0|       0|
|       1|      106|     3600|     1|    106|    3|        2|    3600|    3600|
|       7|      700|     1200|     7|    700|    0|        2|     100|    3600|
+--------+---------+---------+------+-------+-----+---------+--------+--------+
>>> df_join.select("userid","movieid","score","scoretype","playtime1","duration").sort("userid").show()
+------+-------+-----+---------+---------+--------+
|userid|movieid|score|scoretype|playtime1|duration|
+------+-------+-----+---------+---------+--------+
|     1|    102|    5|        1|     1800|       0|
|     1|    106|    3|        2|     3600|    3600|
|     2|    202|    2|        2|     1800|    3600|
|     2|    204|    3|        1|        0|       0|
|     3|    306|    5|        1|        0|       0|
|     7|    100|    0|        1|     3600|    3600|
|     7|    700|    0|        2|     1200|    3600|
+------+-------+-----+---------+---------+--------+

希望这对你有帮助。

# you can use groupBy + sum function
# filter if userid==1 or userid==7 (movieid is not necessary because they have distinct userid)
duplicated_user_movie = df.filter(F.col('userid')==1 or F.col('userid')==7)
duplicated_sum = duplicated_user_movie.groupBy('userid', 'movieid', 'score', 'scoretype', 'duration').agg(F.sum('playtime'))
cols = ['userid', 'movieid', 'score', 'scoretype', 'playtime', 'duration']
duplicated_sum = duplicated_sum.toDF(*cols) # change column orders
df = df.filter((F.col('userid')!=1) & (F.col('userid')!=7))
df = df.union(duplicated_sum) # concat two dataframes
df = df.orderBy('userid') # sort by userid

最新更新