我得到这个错误
line 23, in parseRating
IndexError: list index out of range
…任何对.collect()
,.count()
等的尝试。所以最后一行df3.collect()
抛出了那个错误,但是所有.show()
的工作。我不认为这是数据的问题,但我可能错了。
第一次,真的不知道发生了什么。如有任何帮助,不胜感激。
import os
from os import remove, removedirs
from os.path import join, isfile, dirname
from pyspark.sql.functions import col, explode
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
from pyspark.sql import SparkSession
def parseRating(line):
"""
Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
"""
fields = line.strip().split("::")
return int(fields[3]), int(fields[0]), int(fields[1]), float(fields[2])
#return int(fields[0]), int(fields[1]), float(fields[2])
if __name__ == "__main__":
# set up environment
spark = SparkSession.builder
.master("local")
.appName("Movie Recommendation Engine")
.config("spark.driver.memory", "16g")
.getOrCreate()
sc = spark.sparkContext
# load personal ratings
#myRatings = loadRatings(os.path.abspath('personalRatings.txt'))
myRatingsRDD = sc.textFile("personalRatings.txt").map(parseRating)
ratings = sc.textFile("ratings.dat").map(parseRating)
df1 = spark.createDataFrame(myRatingsRDD,["timestamp","userID","movieID","rating"])
df1.show()
df2 = spark.createDataFrame(ratings,["timestamp","userID","movieID","rating"])
df2.show()
df3 = df1.union(df2)
df3.show()
df3.printSchema()
df3 = df3.
withColumn('userID', col('userID').cast('integer')).
withColumn('movieID', col('movieID').cast('integer')).
withColumn('rating', col('rating').cast('float')).
drop('timestamp')
df3.show()
ratings = df3
df3.collect()
错误来自函数parseRating
,它是关于列表索引超出范围。可能数据中的某些行在被::
分隔符分隔后没有预期的字段数。
如何将文本文件直接导入到指定字段分隔符和标题为true/false的数据框架中,并使用cast
修改列的数据类型。
像这样:
df1 = spark.read.format("csv")
.option("header", "true")
.option("delimiter", "::")
.load("personalRatings.txt")
df1 = df1.select(df1.timestamp.cast("int"),df1.userId.cast("int"),df1.movieId.cast("int"),df1.rating.cast("float"))
df1.show(10)
文本文件中的一行可能格式错误/不完整,因此split("::")
可能无法生成预期字段的数量。您可以更新函数,以便在尝试访问索引之前检查分割的数量。如:
def parseRating(line):
"""
Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
"""
fields = line.strip().split("::")
timestamp = int(fields[3]) if len(fields)>3 else None
userId = int(fields[0]) if len(fields)>0 else None
movieId = int(fields[1]) if len(fields)>1 else None
rating = float(fields[2]) if len(fields)>2 else None
return timestamp, userId, movieId, rating
如果需要的话,你甚至可以做更多的异常处理。
让我知道这是否适合你。