警告:我是Google App Engine和Python的新手,但到目前为止,我已经设法在Google App Engine中实现了PageRank算法。
接下来,我想在Google App Engine中链接三个mapreduce作业。然而,我不明白如何使用BlobKeys将键值对从第一个mapreduce作业传递到第二个mapreduce作业(随后第二个mapreduce作业传递到第三个mapreduce作业)。我试着按照以下网站上的介绍去做:
http://mattfaus.com/2013/10/google-appengine-mapreduce-in-depth/使用BlobKeys类将BlobKey从一个mapreduce作业传递到下一个mapreduce作业。我认为我错误地实现了python类,因为当调用时,"third_party"对象在下面的代码中无法识别。
也许有人能指出我错在哪里。很抱歉无法提供本地驱动的测试。这似乎是一个小野兽!
下面是我要使用的类:
class BlobKeys(mapreduce.base_handler.PipelineBase):
"""Returns a dictionary with the supplied keyword arguments."""
def run(self, keys):
# Remove the key from a string in this format:
# /blobstore/<key>
return {
"blob_keys": [k.split("/")[-1] for k in keys]
}
下面是调用上面类的Pipeline代码(不识别third_party对象):
num_shards=2
# First define the parent pipeline job
class RecommenderPipeline(base_handler.PipelineBase):
"""A pipeline to run Recommender demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey, itr):
#logging.debug("filename is %s" % filekey)
output1 = yield mapreduce_pipeline.MapreducePipeline(
"recommender",
"main.recommender_group_by_user_rating_map1",
"main.recommender_count_ratings_user_freq_reduce1",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_keys": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=num_shards)
# Code below takes output1 and feeds into second mapreduce job.
# Pipeline library ensures that the second pipeline depends on first and
# does not launch until the first has resolved.
output2 = (
yield mapreduce_pipeline.MapreducePipeline(
"recommender",
"main.recommender_pairwise_items_map2",
"main.recommender_calc_similarity_reduce2",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params=( BlobKeys(output1)), #see BlobKeys Class!`
# "blob_keys": [k.split("/")[-1] for k in keys]
#"blob_keys": blobkey, # did not work since "generator pipelines cannot
# directly access ouputs of the child Pipelines that it yields", this code
# would require the generator pipeline to create a temporary dict object
# with the output of the first job - this is not allowed.
# In addition, the string returned by BobStoreOutputWriter is in the format
# /blobstore/<key>, but BlobStoreLineInputReader expects only "<key>"
# To solve these problems, use the BlobKeys class above.
#},
#mapper_params={
# #"blob_keys": [k.split("/")[-1] for k in output1]
# "blob_keys": blobkey.split("/")[-1],
#},
reducer_params={
"mime_type": "text/plain",
},
shards=num_shards))
# Code below takes output2 and feeds into third mapreduce job.
# Pipeline library ensures that the third pipeline depends on second and
# does not launch until the second has resolved.
output3 = (
yield mapreduce_pipeline.MapreducePipeline(
"recommender",
"main.recommender_calculate_ranking_map3",
"main.recommender_ranked_items_reduce3",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params=( BlobKeys(output2)), #see BobKeys Class!`
#mapper_params={
# "blob_keys": blobkey.split("/")[-1],
#},
reducer_params={
"mime_type": "text/plain",
},
shards=num_shards))
yield StoreOutput("Recommender", filekey, output3, itr) #stores key to results so you can look at it.
我想知道我是否有更多的问题正确使用Python类,或者更多的问题在GAE中实现这一点。我怀疑两者兼而有之。任何帮助将非常感激!谢谢!
一个pipeline参数可以是一个具体的值或一个PipelineFutures(在这种情况下,它将等待直到future的值可用)。在你的例子中,你正在传递一个PipelineFutures作为一个具体值(BlobKeys)的参数。相反,尝试生成BlobKeys(output1)并将其结果作为参数传递给下一个管道。例句:output1_1 = yield BlobKeys(output1)output2 = yield mapreduce_pipeline. mapreduceppipeline(…)mapper_params = output1_1…)