分析职业和电影类型的电影评级百分比



我刚开始学习Spark编程和Python编程:你能帮我理解我在代码中的错误吗?

我正在Jupyter Notebooks中运行代码,交互模式。

  1. 下面的测试代码工作正常,我在其中测试了概念:

    rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))])
    result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]), (x[1][4]+y[1][4]), (x[1][5]+y[1][5]), (x[1][6]+y[1][6]), (x[1][7]+y[1][7]), (x[1][8]+y[1][8]), (x[1][9]+y[1][9]), (x[1][10]+y[1][10]), (x[1][11]+y[1][11])))
    print (result.top(3))
    

    输出:

    [('librarian', (2, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0))]
    
  2. 下面也可以正常工作:

    #[(movieid, genre_list)]
    aggregateRDD = movieRDD.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1])))
    print (aggregateRDD.top(3))
    

    输出:

    [(1682, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1681, [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1680, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])]
    
  3. 但是,当我在我的程序中使用类似的概念时,它不接受。我做错了什么:

    ##############################################################################
    ### Analysis of Movie Ratings percentages across Occupation and Movie Genre
    ##############################################################################
    from pyspark import SparkConf, SparkContext
    conf = SparkConf().setMaster("local").setAppName("popularMovie")
    sc = SparkContext(conf =conf)
    ###import movie ratings into RDD
    ratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")
    ###import user details into RDD
    userLines = sc.textFile("///SparkCourse/ml-100k/u.user")
    ###import movie data into RDD
    movieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")
    ###import genre data into RDD
    genreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")
    ###split on delimiter functions
    def splitRatingTab(line):
    line = line.split('t')
    return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)
    def splitUserPipe(line):
    line = line.split('|')
    return (int(line[0]), line[3]) #(user, occupation)
    def splitMoviePipe(line):
    line = line.split('|')
    return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])
    
    def listToIntElements(lst):
    """conver the boolean text ('0', '1') genre value to integers (0, 1)"""
    for cnt, _ in enumerate(lst):
    lst[cnt] = int(_)
    return lst
    ###create dictionary object for genreid and genre
    def loadMovieGenre():
    """
    create dictionary object for genreid and genre
    """
    genre = {}
    with open('C:/SparkCourse/ml-100k/u.genre') as file:
    for line in file:
    #each line is of type [genere, genreid]
    line = line.split('|')
    #convert genreid to int, to remove new line 'n' at the end of string
    genre[int(line[1])] = line[0]
    return genre
    
    ### Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers
    ratingRDD = ratingLines.map(lambda line: splitRatingTab(line))
    ### Transform to RDD as [(user, occupation)]
    occupationRDD = userLines.map(splitUserPipe)
    ### Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres
    movieRDD = movieLines.map(splitMoviePipe)
    ###join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; 
    ###then Transform to [(movieid,((userid, rating), genre) )]
    joinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)
    
    ###Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]
    ###to Transform to [(occupation, ((1, genre)))]
    transRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))
    joinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1])))
    print (joinRatingGenresOccup.take(2))
    
    ###Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, {cntGenresRating}))]
    totalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][12]), (x[1][3]+y[1][3])))
    print (totalRatingGenreCntByOccupation.take(2))
    

    错误:

    [('librarian', (1, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))]
    ---------------------------------------------------------------------------
    Py4JJavaError                             Traceback (most recent call last)
    <ipython-input-137-a156d8bbfde9> in <module>()
    ----> 1 get_ipython().run_cell_magic('time', '', 'n##############################################################################n### Analysis of Movie Ratings percentages across Occupation and Movie Genren##############################################################################nn#import movie ratings into RDDnratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")n#import user details into RDDnuserLines = sc.textFile("///SparkCourse/ml-100k/u.user")n#import movie data into RDDnmovieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")n#import genre data into RDDngenreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")nn#split on delimiter functionsndef splitRatingTab(line):n    line = line.split('\t')n    return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)ndef splitUserPipe(line):n    line = line.split('|')n    return (int(line[0]), line[3]) #(user, occupation)ndef splitMoviePipe(line):n    line = line.split('|')n    return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])nnndef listToIntElements(lst):n    """conver the boolean text ('0', '1') genre value to integers (0, 1)"""n    for cnt, _ in enumerate(lst):n        lst[cnt] = int(_)n    return lstnn#create dictionary object for genreid and genrendef loadMovieGenre():n    """n    create dictionary object for genreid and genren    """n    genre = {}n    with open('C:/SparkCourse/ml-100k/u.genre') as file:n        for line in file:n            #each line is of type [genere, genreid]n            line = line.split('|')n            #convert genreid to int, to remove new line '\n' at the end of stringn            genre[int(line[1])] = line[0]n        return genrenn    n# Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewersnratingRDD = ratingLines.map(lambda line: splitRatingTab(line))n#print ('ratingRDD:\n',ratingRDD.top(5))nn# Transform to RDD as [(user, occupation)]noccupationRDD = userLines.map(splitUserPipe)n#print ('occupationRDD:\n',occupationRDD.top(3))nn# Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genresnmovieRDD = movieLines.map(splitMoviePipe)n#print ('movieRDD:\n',movieRDD.top(3))nn#join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; n#then Transform to [(movieid,((userid, rating), genre) )]njoinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)n#print (joinRatingMovieGenres.take(2))nn#Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]n#to Transform to [(occupation, ((1, genre)))]ntransRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))njoinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1])))nprint (joinRatingGenresOccup.take(2))nnn#Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, {cntGenresRating}))]ntotalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][12]), (x[1][3]+y[1][3])))nprint (totalRatingGenreCntByOccupation.take(2))')
    C:UsersvmatchaAppDataLocalEnthoughtCanopyedmenvsUserlibsite-packagesIPythoncoreinteractiveshell.py in run_cell_magic(self, magic_name, line, cell)
    2113             magic_arg_s = self.var_expand(line, stack_depth)
    2114             with self.builtin_trap:
    -> 2115                 result = fn(magic_arg_s, cell)
    2116             return result
    2117 
    <decorator-gen-60> in time(self, line, cell, local_ns)
    C:UsersvmatchaAppDataLocalEnthoughtCanopyedmenvsUserlibsite-packagesIPythoncoremagic.py in <lambda>(f, *a, **k)
    186     # but it's overkill for just that one bit of state.
    187     def magic_deco(arg):
    --> 188         call = lambda f, *a, **k: f(*a, **k)
    189 
    190         if callable(arg):
    C:UsersvmatchaAppDataLocalEnthoughtCanopyedmenvsUserlibsite-packagesIPythoncoremagicsexecution.py in time(self, line, cell, local_ns)
    1183         else:
    1184             st = clock2()
    -> 1185             exec(code, glob, local_ns)
    1186             end = clock2()
    1187             out = None
    <timed exec> in <module>()
    C:sparkpythonpysparkrdd.py in take(self, num)
    1356 
    1357             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
    -> 1358             res = self.context.runJob(self, takeUpToNumLeft, p)
    1359 
    1360             items += res
    C:sparkpythonpysparkcontext.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    999         # SparkContext#runJob.
    1000         mappedRDD = rdd.mapPartitions(partitionFunc)
    -> 1001         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    1002         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    1003 
    C:sparkpythonlibpy4j-0.10.6-src.zippy4jjava_gateway.py in __call__(self, *args)
    1158         answer = self.gateway_client.send_command(command)
    1159         return_value = get_return_value(
    -> 1160             answer, self.gateway_client, self.target_id, self.name)
    1161 
    1162         for temp_arg in temp_args:
    C:sparkpythonpysparksqlutils.py in deco(*a, **kw)
    61     def deco(*a, **kw):
    62         try:
    ---> 63             return f(*a, **kw)
    64         except py4j.protocol.Py4JJavaError as e:
    65             s = e.java_exception.toString()
    C:sparkpythonlibpy4j-0.10.6-src.zippy4jprotocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.n".
    --> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(
    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 239.0 failed 1 times, most recent failure: Lost task 1.0 in stage 239.0 (TID 447, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
    File "C:sparkpythonlibpyspark.zippysparkworker.py", line 229, in main
    File "C:sparkpythonlibpyspark.zippysparkworker.py", line 224, in process
    File "C:sparkpythonpysparkrdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
    File "C:sparkpythonpysparkrdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
    File "C:sparkpythonpysparkrdd.py", line 362, in func
    return f(iterator)
    File "C:sparkpythonpysparkrdd.py", line 1857, in combineLocally
    merger.mergeValues(iterator)
    File "C:sparkpythonlibpyspark.zippysparkshuffle.py", line 238, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
    File "<timed exec>", line 73, in <lambda>
    TypeError: 'int' object is not subscriptable
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
    Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
    File "C:sparkpythonlibpyspark.zippysparkworker.py", line 229, in main
    File "C:sparkpythonlibpyspark.zippysparkworker.py", line 224, in process
    File "C:sparkpythonpysparkrdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
    File "C:sparkpythonpysparkrdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
    File "C:sparkpythonpysparkrdd.py", line 362, in func
    return f(iterator)
    File "C:sparkpythonpysparkrdd.py", line 1857, in combineLocally
    merger.mergeValues(iterator)
    File "C:sparkpythonlibpyspark.zippysparkshuffle.py", line 238, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
    File "<timed exec>", line 73, in <lambda>
    TypeError: 'int' object is not subscriptable
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more 
    

当你reduceByKey时,你必须返回你收到的相同结构,否则下次你将遇到相同键的值并尝试减少它时,你的函数将不起作用。

您只测试了两个元素,所以您还没有看到它,但是如果您尝试使用 3...:

rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])),
('librarian', (1, [0, 1, 0, 0]))])
result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]),
(x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]) ))

..... 文件 "/home/hado/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1784年,_mergeCombiners年 merger.mergeCombiners(iterator) File "/home/hado/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/shuffle.py", 第 272 行,在合并合路器中 d[k] = comb(d[k], v) 如果 k 在 d else v File ", 第 3 行, 在 TypeError 中: 'int' 对象没有属性 'getitem' ......

在代码中执行 reduceByKey的正确方法是返回具有相同大小的值和列表的相同元组:

rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])),
('librarian', (1, [0, 1, 0, 0]))])
result = rdd.reduceByKey(lambda x, y: ( x[0] + y[0],
[x[1][0]+y[1][0], x[1][1]+y[1][1], x[1][2]+y[1][2], x[1][3]+y[1][3] ] ))
print (result.collect())

[("图书馆员", (3, [0, 2, 1, 0]))]

你也可以做一个组合ByKey,如下所述: 'combineByKey', pyspark

另请注意,这(在倒数第二行):"(x[1][2]+y[1][12])"似乎是一个错字。

最新更新