有没有办法避免在此 Pyspark 代码中使用"for"循环



我想在数据库中查找每个类型的平均评分和该类型电影的总数。但是,数据库的组织方式如下:columns = [movieId, title, rating, genre],其中列genre是适用于电影的类型列表(最多6项)。我如何转换数据库,使索引列是没有'for'循环的类型,因为目前我正在访问列表中的每个元素。

下面是我的代码:

import pyspark
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import max,split,avg,count,col,sum,concat_ws
spark = SparkSession.builder.appName("APISpark").getOrCreate()

ratings = spark.read.option("header","true").csv("input/ml25m/ratings.csv").drop("userId","timestamp")
movies = spark.read.option("header","true").csv("input/ml-25m/movies.csv")
movies = movies.withColumn('genre', split(movies['genres'], '|') ).drop('genres')
A = movies.join(ratings,ratings["movieId"]==movies["movieId"]).drop('movieId')
F = A.groupBy(col("genre")[0]).agg(sum("rating").alias('s0'), count("title").alias('c0'))
F1 = A.groupBy(col("genre")[1]).agg(sum("rating").alias('s1'), count("title").alias('c1'))
F2 = A.groupBy(col("genre")[2]).agg(sum("rating").alias('s2'), count("title").alias('c2'))
F3 = A.groupBy(col("genre")[3]).agg(sum("rating").alias('s3'), count("title").alias('c3'))
F4 = A.groupBy(col("genre")[4]).agg(sum("rating").alias('s4'), count("title").alias('c4'))
F5 = A.groupBy(col("genre")[5]).agg(sum("rating").alias('s5'), count("title").alias('c5'))
F = F.join(F1,F['genre[0]']==F1['genre[1]'],"left").drop('genre[1]')
.join(F2,F2['genre[2]']==F['genre[0]'],"left").drop('genre[2]')
.join(F3,F3['genre[3]']==F['genre[0]'],"left").drop('genre[3]')
.join(F4,F4['genre[4]']==F['genre[0]'],"left").drop('genre[4]')
.join(F5,F5['genre[5]']==F['genre[0]'],"left").drop('genre[5]').fillna(0)
F = F.select(F[0].alias('Genres'),((F[1]+F[3]+F[5]+F[7]+F[9]+F[11])/(F[2]+F[4]+F[6]+F[8]+F[10]+F[12])).alias('Average_Rating'),(F[2]+F[4]+F[6]+F[8]+F[10]+F[12]).alias('Count'))
F.select(concat_ws(",",col("Genres"),col("Average_Rating"),col("Count")).alias("genre_averagerating_Promedio_reviews")).write.text("3_out")

有一个更好的方法,一个更好的方法

设置
A.show()
+-------+------+------+------------+
|movieId| title|rating|      genres|
+-------+------+------+------------+
|      1|movie1|     6|   [a, b, c]|
|      2|movie2|     2|      [b, c]|
|      3|movie3|     8|         [c]|
|      4|movie4|     6|[a, b, c, d]|
+-------+------+------+------------+

解决方案关键步骤是explode类型列表复制每个类型作为一个单独的行,然后你可以通过genre分组数据帧并进行聚合。

result = (
A
.withColumn('genre', F.explode('genre'))
.groupBy('genre').agg(
F.count('movieId').alias('count'),
F.mean('rating').alias('avg_rating'),
)
)
结果

result.show()
+------+-----+-----------------+
|genre |count|       avg_rating|
+------+-----+-----------------+
|     c|    4|              5.5|
|     b|    3|4.666666666666667|
|     a|    2|              6.0|
|     d|    1|              6.0|
+------+-----+-----------------+

最新更新