我想在数据库中查找每个类型的平均评分和该类型电影的总数。但是,数据库的组织方式如下:columns = [movieId, title, rating, genre],其中列genre是适用于电影的类型列表(最多6项)。我如何转换数据库,使索引列是没有'for'循环的类型,因为目前我正在访问列表中的每个元素。
下面是我的代码:
import pyspark
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import max,split,avg,count,col,sum,concat_ws
spark = SparkSession.builder.appName("APISpark").getOrCreate()
ratings = spark.read.option("header","true").csv("input/ml25m/ratings.csv").drop("userId","timestamp")
movies = spark.read.option("header","true").csv("input/ml-25m/movies.csv")
movies = movies.withColumn('genre', split(movies['genres'], '|') ).drop('genres')
A = movies.join(ratings,ratings["movieId"]==movies["movieId"]).drop('movieId')
F = A.groupBy(col("genre")[0]).agg(sum("rating").alias('s0'), count("title").alias('c0'))
F1 = A.groupBy(col("genre")[1]).agg(sum("rating").alias('s1'), count("title").alias('c1'))
F2 = A.groupBy(col("genre")[2]).agg(sum("rating").alias('s2'), count("title").alias('c2'))
F3 = A.groupBy(col("genre")[3]).agg(sum("rating").alias('s3'), count("title").alias('c3'))
F4 = A.groupBy(col("genre")[4]).agg(sum("rating").alias('s4'), count("title").alias('c4'))
F5 = A.groupBy(col("genre")[5]).agg(sum("rating").alias('s5'), count("title").alias('c5'))
F = F.join(F1,F['genre[0]']==F1['genre[1]'],"left").drop('genre[1]')
.join(F2,F2['genre[2]']==F['genre[0]'],"left").drop('genre[2]')
.join(F3,F3['genre[3]']==F['genre[0]'],"left").drop('genre[3]')
.join(F4,F4['genre[4]']==F['genre[0]'],"left").drop('genre[4]')
.join(F5,F5['genre[5]']==F['genre[0]'],"left").drop('genre[5]').fillna(0)
F = F.select(F[0].alias('Genres'),((F[1]+F[3]+F[5]+F[7]+F[9]+F[11])/(F[2]+F[4]+F[6]+F[8]+F[10]+F[12])).alias('Average_Rating'),(F[2]+F[4]+F[6]+F[8]+F[10]+F[12]).alias('Count'))
F.select(concat_ws(",",col("Genres"),col("Average_Rating"),col("Count")).alias("genre_averagerating_Promedio_reviews")).write.text("3_out")
有一个更好的方法,一个更好的方法
设置A.show()
+-------+------+------+------------+
|movieId| title|rating| genres|
+-------+------+------+------------+
| 1|movie1| 6| [a, b, c]|
| 2|movie2| 2| [b, c]|
| 3|movie3| 8| [c]|
| 4|movie4| 6|[a, b, c, d]|
+-------+------+------+------------+
解决方案关键步骤是explode
类型列表复制每个类型作为一个单独的行,然后你可以通过genre
分组数据帧并进行聚合。
result = (
A
.withColumn('genre', F.explode('genre'))
.groupBy('genre').agg(
F.count('movieId').alias('count'),
F.mean('rating').alias('avg_rating'),
)
)
结果result.show()
+------+-----+-----------------+
|genre |count| avg_rating|
+------+-----+-----------------+
| c| 4| 5.5|
| b| 3|4.666666666666667|
| a| 2| 6.0|
| d| 1| 6.0|
+------+-----+-----------------+