mapreduce中处理的文件



我试图确定哪些文件是由AppEngine上的mapreduce作业处理的。我在云存储桶中使用通配符:

class FilesPipeline(base_handler.PipelineBase):
    def run(self):
        output_blobstore_ids = yield mapreduce_pipeline.MapreducePipeline(
            "proc_files",
            "project.task.proc_files.mapper",
            "project.task.proc_files.reducer",
            "mapreduce.input_readers.FileInputReader",
            "mapreduce.output_writers.BlobstoreOutputWriter"
            ,mapper_params={
              'shard_count': 4,
              'batch_size': 50,
              'files': ['/gs/project_inbox/partner*'],
              'format': 'lines'}
            ,reducer_params={
            },
            shards=4)
        yield StoreOutput(output_blobstore_ids)

理想情况下,我希望获得StoreOutput实例中mapreduce管道处理的文件列表,但任何地方都可以。

谢谢!

结果如下:

准备管道

files_list=self.get_files_list()
pipeline=FilesPipeline(mapper_files=files_list)
pipeline.start(queue_name='proc-files')
<标题>管道定义

类FilesPipeline (base_handler.PipelineBase):Def run(self, mapper_files=[]):

    output_blobstore_ids = yield mapreduce_pipeline.MapreducePipeline(
        "proc_files",
        "project.task.proc_files.mapper",
        "project.task.proc_files.reducer",
        "mapreduce.input_readers.FileInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter"
        ,mapper_params={
          'shard_count': 4
          ,'batch_size': 50
          ,"files":      mapper_files
          ,'format':     'lines'}
        ,reducer_params={
        },
        shards=4)
    yield StoreOutput(output_blobstore_ids)

相关内容

  • 没有找到相关文章

最新更新