我正在学习spark。是否有一种方法来创建和返回一个数据帧,通过组合行之间的播放时间,其中userId和movieId在表中的列之间重复?谢谢你!输入图片描述
虽然我无法理解结果数据,但通过查看附件的截图,我想出了这个解决方案。
在这里,我假设我必须首先从最终结果聚合播放时间的重复记录中选择记录。
我已经创建了数据集的两个数据框df
和df1
,并将它们连接起来。
>>> windowSpec = Window.partitionBy("movieid").orderBy("userid")
>>> df1=df.withColumn("row_number",row_number().over(windowSpec))
>>> df1.show()
+------+-------+-----+---------+--------+--------+----------+
|userid|movieid|score|scoretype|playtime|duration|row_number|
+------+-------+-----+---------+--------+--------+----------+
| 3| 306| 5| 1| 0| 0| 1|
| 2| 202| 2| 2| 1800| 3600| 1|
| 7| 100| 0| 1| 3600| 3600| 1|
| 1| 102| 5| 1| 0| 0| 1|
| 1| 102| 0| 2| 1800| 3600| 2|
| 2| 204| 3| 1| 0| 0| 1|
| 1| 106| 3| 2| 3600| 3600| 1|
| 7| 700| 0| 2| 100| 3600| 1|
| 7| 700| 0| 2| 500| 3600| 2|
| 7| 700| 0| 2| 600| 3600| 3|
+------+-------+-----+---------+--------+--------+----------+
>>> df1=df1.where(df1.row_number==1)
>>> df1.show()
+------+-------+-----+---------+--------+--------+----------+
|userid|movieid|score|scoretype|playtime|duration|row_number|
+------+-------+-----+---------+--------+--------+----------+
| 3| 306| 5| 1| 0| 0| 1|
| 2| 202| 2| 2| 1800| 3600| 1|
| 7| 100| 0| 1| 3600| 3600| 1|
| 1| 102| 5| 1| 0| 0| 1|
| 2| 204| 3| 1| 0| 0| 1|
| 1| 106| 3| 2| 3600| 3600| 1|
| 7| 700| 0| 2| 100| 3600| 1|
+------+-------+-----+---------+--------+--------+----------+
>>> df=df.groupBy("userid","movieid").agg(sum("playtime"))
>>> df.show()
+------+-------+-------------+
|userid|movieid|sum(playtime)|
+------+-------+-------------+
| 1| 102| 1800|
| 2| 202| 1800|
| 7| 100| 3600|
| 2| 204| 0|
| 3| 306| 0|
| 7| 700| 1200|
| 1| 106| 3600|
+------+-------+-------------+
>>> df=df.withColumnRenamed("userid","useriddf").withColumnRenamed("movieid","movieiddf").withColumnRenamed("sum(playtime)","playtime1")
>>> df.show()
+--------+---------+-------------+
|useriddf|movieiddf|playtime1 |
+--------+---------+-------------+
| 1| 102| 1800|
| 2| 202| 1800|
| 7| 100| 3600|
| 2| 204| 0|
| 3| 306| 0|
| 7| 700| 1200|
| 1| 106| 3600|
+--------+---------+-------------+
>>> df_join=df.join(df1,df.movieiddf == df1.movieid,"inner")
>>> df_join.show()
+--------+---------+---------+------+-------+-----+---------+--------+--------+
|useriddf|movieiddf|playtime1|userid|movieid|score|scoretype|playtime|duration|
+--------+---------+---------+------+-------+-----+---------+--------+--------+
| 3| 306| 0| 3| 306| 5| 1| 0| 0|
| 2| 202| 1800| 2| 202| 2| 2| 1800| 3600|
| 7| 100| 3600| 7| 100| 0| 1| 3600| 3600|
| 1| 102| 1800| 1| 102| 5| 1| 0| 0|
| 2| 204| 0| 2| 204| 3| 1| 0| 0|
| 1| 106| 3600| 1| 106| 3| 2| 3600| 3600|
| 7| 700| 1200| 7| 700| 0| 2| 100| 3600|
+--------+---------+---------+------+-------+-----+---------+--------+--------+
>>> df_join.select("userid","movieid","score","scoretype","playtime1","duration").sort("userid").show()
+------+-------+-----+---------+---------+--------+
|userid|movieid|score|scoretype|playtime1|duration|
+------+-------+-----+---------+---------+--------+
| 1| 102| 5| 1| 1800| 0|
| 1| 106| 3| 2| 3600| 3600|
| 2| 202| 2| 2| 1800| 3600|
| 2| 204| 3| 1| 0| 0|
| 3| 306| 5| 1| 0| 0|
| 7| 100| 0| 1| 3600| 3600|
| 7| 700| 0| 2| 1200| 3600|
+------+-------+-----+---------+---------+--------+
希望这对你有帮助。
# you can use groupBy + sum function
# filter if userid==1 or userid==7 (movieid is not necessary because they have distinct userid)
duplicated_user_movie = df.filter(F.col('userid')==1 or F.col('userid')==7)
duplicated_sum = duplicated_user_movie.groupBy('userid', 'movieid', 'score', 'scoretype', 'duration').agg(F.sum('playtime'))
cols = ['userid', 'movieid', 'score', 'scoretype', 'playtime', 'duration']
duplicated_sum = duplicated_sum.toDF(*cols) # change column orders
df = df.filter((F.col('userid')!=1) & (F.col('userid')!=7))
df = df.union(duplicated_sum) # concat two dataframes
df = df.orderBy('userid') # sort by userid