我想链多个mapreduce工作在谷歌应用程序引擎在Python



警告:我是Google App Engine和Python的新手,但到目前为止,我已经设法在Google App Engine中实现了PageRank算法。

接下来,我想在Google App Engine中链接三个mapreduce作业。然而,我不明白如何使用BlobKeys将键值对从第一个mapreduce作业传递到第二个mapreduce作业(随后第二个mapreduce作业传递到第三个mapreduce作业)。我试着按照以下网站上的介绍去做:

http://mattfaus.com/2013/10/google-appengine-mapreduce-in-depth/

使用BlobKeys类将BlobKey从一个mapreduce作业传递到下一个mapreduce作业。我认为我错误地实现了python类,因为当调用时,"third_party"对象在下面的代码中无法识别。

也许有人能指出我错在哪里。很抱歉无法提供本地驱动的测试。这似乎是一个小野兽!

下面是我要使用的类:

class BlobKeys(mapreduce.base_handler.PipelineBase):
  """Returns a dictionary with the supplied keyword arguments."""
  def run(self, keys):
    # Remove the key from a string in this format:
    # /blobstore/<key>
    return {
        "blob_keys": [k.split("/")[-1] for k in keys]
    }

下面是调用上面类的Pipeline代码(不识别third_party对象):

num_shards=2
# First define the parent pipeline job
class RecommenderPipeline(base_handler.PipelineBase):
  """A pipeline to run Recommender demo.
  Args:
    blobkey: blobkey to process as string. Should be a zip archive with
      text files inside.
  """
  def run(self, filekey, blobkey, itr):
    #logging.debug("filename is %s" % filekey)
    output1 = yield mapreduce_pipeline.MapreducePipeline(
        "recommender",
        "main.recommender_group_by_user_rating_map1",
        "main.recommender_count_ratings_user_freq_reduce1",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "blob_keys": blobkey,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=num_shards)

    # Code below takes output1 and feeds into second mapreduce job.
    # Pipeline library ensures that the second pipeline depends on first and 
    # does not launch until the first has resolved.
    output2 = (
    yield mapreduce_pipeline.MapreducePipeline(
        "recommender",
        "main.recommender_pairwise_items_map2",
        "main.recommender_calc_similarity_reduce2",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params=( BlobKeys(output1)), #see BlobKeys Class!`
        # "blob_keys": [k.split("/")[-1] for k in keys]
        #"blob_keys": blobkey, # did not work since "generator pipelines cannot
        # directly access ouputs of the child Pipelines that it yields", this code
        # would require the generator pipeline to create a temporary dict object 
        # with the output of the first job - this is not allowed.
        # In addition, the string returned by BobStoreOutputWriter is in the format
        # /blobstore/<key>, but BlobStoreLineInputReader expects only "<key>"
        # To solve these problems, use the BlobKeys class above.
        #},
        #mapper_params={
        #    #"blob_keys": [k.split("/")[-1] for k in output1]
        #    "blob_keys": blobkey.split("/")[-1],
        #},
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=num_shards))
    # Code below takes output2 and feeds into third mapreduce job.
    # Pipeline library ensures that the third pipeline depends on second and 
    # does not launch until the second has resolved.
    output3 = (
    yield mapreduce_pipeline.MapreducePipeline(
        "recommender",
        "main.recommender_calculate_ranking_map3",
        "main.recommender_ranked_items_reduce3",
        "mapreduce.input_readers.BlobstoreLineInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params=( BlobKeys(output2)), #see BobKeys Class!`
        #mapper_params={
        #    "blob_keys": blobkey.split("/")[-1],
        #},
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=num_shards))
    yield StoreOutput("Recommender", filekey, output3, itr)  #stores key to results so you can look at it.

我想知道我是否有更多的问题正确使用Python类,或者更多的问题在GAE中实现这一点。我怀疑两者兼而有之。任何帮助将非常感激!谢谢!

一个pipeline参数可以是一个具体的值或一个PipelineFutures(在这种情况下,它将等待直到future的值可用)。在你的例子中,你正在传递一个PipelineFutures作为一个具体值(BlobKeys)的参数。相反,尝试生成BlobKeys(output1)并将其结果作为参数传递给下一个管道。例句:output1_1 = yield BlobKeys(output1)output2 = yield mapreduce_pipeline. mapreduceppipeline(…)mapper_params = output1_1…)

相关内容

  • 没有找到相关文章

最新更新