我已经创建了一个相当简单的MapReduce管道,但我有一个神秘的:
PipelineSetupError: Error starting production.cron.pipelines.ItemsInfoPipeline(*(), **{})#a741186284ed4fb8a4cd06e38921beff:
,当我试图启动它。这是管道代码:
class ItemsInfoPipeline(base_handler.PipelineBase):
"""
"""
def run(self):
output = yield mapreduce_pipeline.MapreducePipeline(
job_name="items_job",
mapper_spec="production.cron.mappers.items_info_mapper",
input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
mapper_params={
"input_reader": {
"entity_kind": "production.models.Transaction"
}
}
)
yield ItemsInfoStorePipeline(output)
class ItemsInfoStorePipeline(base_handler.PipelineBase):
"""
"""
def run(self, statistics):
print statistics
return "OK"
当然,我已经仔细检查了映射器路径是正确的,并考虑到ItemsInfoStorePipeline没有做任何事情,因为我正在关注管道启动,这并没有发生。
这些都是由Flask视图触发的,如下所示:
class ItemsInfoMRJob(views.MethodView):
"""
It's based on transacions.
"""
def get(self):
"""
:return:
"""
pipeline = ItemsInfoPipeline()
pipeline.start()
redirect_url = "%s/status?root=%s" % (pipeline.base_path, pipeline.pipeline_id)
return flask.redirect(redirect_url)
我使用GoogleAppEngineMapReduce==1.9.22.0
谢谢你的帮助。
上面的代码一旦部署就可以工作了。
更新2
显然有更多的人在处理这个问题:
https://github.com/GoogleCloudPlatform/appengine-mapreduce/issues/103我正在更新这个。我有一个使用管道的代码库,在OSX中运行良好。我有另一个使用OSX的开发人员,我做的任何事情似乎都不能让它工作,他得到了这个:
ProtoRPC方法实现时遇到意外错误:PipelineSetupError
我试着交换版本,使我们的PC完美匹配,它继续发生。我终于崩溃了,在docker中构建了一个Ubuntu镜像。我也在尽我最大的努力使我们的AppEngine和库的版本完美匹配。
它也拒绝以相同的消息开始。我开始在库中消除吞噬错误的部分的注释,但这是一个很长的兔子洞,因为上面的很多东西似乎也吞噬了正在发生的任何事情。