Python AppEngine MapReduce



我已经创建了一个相当简单的MapReduce管道,但我有一个神秘的:

PipelineSetupError: Error starting production.cron.pipelines.ItemsInfoPipeline(*(), **{})#a741186284ed4fb8a4cd06e38921beff:

,当我试图启动它。这是管道代码:

class ItemsInfoPipeline(base_handler.PipelineBase):
"""
"""
    def run(self):
        output = yield mapreduce_pipeline.MapreducePipeline(
            job_name="items_job",
            mapper_spec="production.cron.mappers.items_info_mapper",
            input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
            mapper_params={
                "input_reader": {
                    "entity_kind": "production.models.Transaction"
                }
            }
        )
        yield ItemsInfoStorePipeline(output)

class ItemsInfoStorePipeline(base_handler.PipelineBase):
"""
"""
    def run(self, statistics):
        print statistics
        return "OK"

当然,我已经仔细检查了映射器路径是正确的,并考虑到ItemsInfoStorePipeline没有做任何事情,因为我正在关注管道启动,这并没有发生。

这些都是由Flask视图触发的,如下所示:

class ItemsInfoMRJob(views.MethodView):
"""
It's based on transacions.
"""
    def get(self):
    """
    :return:
    """
        pipeline = ItemsInfoPipeline()
        pipeline.start()
        redirect_url = "%s/status?root=%s" % (pipeline.base_path, pipeline.pipeline_id)
        return flask.redirect(redirect_url)

我使用GoogleAppEngineMapReduce==1.9.22.0

谢谢你的帮助。

上面的代码一旦部署就可以工作了。

更新2

显然有更多的人在处理这个问题:

https://github.com/GoogleCloudPlatform/appengine-mapreduce/issues/103

我正在更新这个。我有一个使用管道的代码库,在OSX中运行良好。我有另一个使用OSX的开发人员,我做的任何事情似乎都不能让它工作,他得到了这个:

ProtoRPC方法实现时遇到意外错误:PipelineSetupError

我试着交换版本,使我们的PC完美匹配,它继续发生。我终于崩溃了,在docker中构建了一个Ubuntu镜像。我也在尽我最大的努力使我们的AppEngine和库的版本完美匹配。

它也拒绝以相同的消息开始。我开始在库中消除吞噬错误的部分的注释,但这是一个很长的兔子洞,因为上面的很多东西似乎也吞噬了正在发生的任何事情。

相关内容

  • 没有找到相关文章

最新更新