我使用谷歌mapreduce库来处理我的数据。在处理数据时,计数器被用于映射函数。但是我不知道如何在finalize方法中获得计数器结果。
def mapper(obj):
yield obj
yield operation.counters.Increment("process-obj")
class Test(base_handler.PipelineBase):
"""A pipeline to ingest log as CSV in Google Storage
"""
def run(self, setting_id):
filepath = yield mapreduce_pipeline.MapperPipeline(
"test",
"mapper",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
},
shards=10
)
def finalized(self):
# how to read the counter process-obj
# how to get the setting_id
pass
命名输出可能是您正在寻找的。你可以在这里找到更多细节。
下面是您的代码,使用命名输出来返回各种计数器,包括您定义的计数器:
def mapper(obj):
yield obj
yield operation.counters.Increment("process-obj")
class Test(base_handler.PipelineBase):
"""A pipeline to ingest log as CSV in Google Storage
"""
output_names = ['counters']
def run(self, setting_id):
results = yield mapreduce_pipeline.MapperPipeline(
"test",
"mapper",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
},
shards=10
)
yield MapreduceResult(results.counters)
def finalized(self):
print 'Counters here: ', self.outputs.counters
class MapreduceResult(base_handler.PipelineBase):
def run(self, counters):
self.fill(self.outputs.counters, counters)