我试图确定哪些文件是由AppEngine上的mapreduce作业处理的。我在云存储桶中使用通配符:
class FilesPipeline(base_handler.PipelineBase):
def run(self):
output_blobstore_ids = yield mapreduce_pipeline.MapreducePipeline(
"proc_files",
"project.task.proc_files.mapper",
"project.task.proc_files.reducer",
"mapreduce.input_readers.FileInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter"
,mapper_params={
'shard_count': 4,
'batch_size': 50,
'files': ['/gs/project_inbox/partner*'],
'format': 'lines'}
,reducer_params={
},
shards=4)
yield StoreOutput(output_blobstore_ids)
理想情况下,我希望获得StoreOutput
实例中mapreduce管道处理的文件列表,但任何地方都可以。
谢谢!
结果如下:
准备管道
files_list=self.get_files_list()
pipeline=FilesPipeline(mapper_files=files_list)
pipeline.start(queue_name='proc-files')
<标题>管道定义类FilesPipeline (base_handler.PipelineBase):Def run(self, mapper_files=[]):
output_blobstore_ids = yield mapreduce_pipeline.MapreducePipeline(
"proc_files",
"project.task.proc_files.mapper",
"project.task.proc_files.reducer",
"mapreduce.input_readers.FileInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter"
,mapper_params={
'shard_count': 4
,'batch_size': 50
,"files": mapper_files
,'format': 'lines'}
,reducer_params={
},
shards=4)
yield StoreOutput(output_blobstore_ids)
标题>