我正在尝试(http://code.google.com/p/appengine-mapreduce/)中的mapreduce框架,并稍微修改了演示应用程序(使用mapreduce.input_readers)。
数据存储inputreader(而不是mapreduce.input_reader . blobstorezipinputreader)。我设置了2个管道类:
class IndexPipeline(base_handler.PipelineBase):
def run(self):
output = yield mapreduce_pipeline.MapreducePipeline(
"index",
"main.index_map", #added higher up in code
"main.index_reduce", #added higher up in code
"mapreduce.input_readers.DatastoreInputReader",
mapper_params={
"entity_kind": "model.SearchRecords",
},
shards=16)
yield StoreOutput("Index", output)
class StoreOutput(base_handler.PipelineBase):
def run(self, mr_type, encoded_key):
logging.info("output is %s %s" % (mr_type, str(encoded_key)))
if encoded_key:
key = db.Key(encoded=encoded_key)
m = db.get(key)
yield op.db.Put(m)
然后输入:
pipeline = IndexPipeline()
pipeline.start()
但是我一直得到这个错误:
Handler yielded two: ['a'] , but no output writer is set.
我试图在源代码中找到设置输出写入器的地方,但没有成功。我唯一发现的是应该在某处设置一个output_writer_class
。
有人知道怎么设置这个吗?
在旁注中,StoreOutput
中的encoded_key
参数似乎总是为None。
输出写入器必须定义为mapreduce_pipeline的参数。MapreducePipeline (cf. docstring):
class MapreducePipeline(base_handler.PipelineBase):
"""Pipeline to execute MapReduce jobs.
Args:
job_name: job name as string.
mapper_spec: specification of mapper to use.
reducer_spec: specification of reducer to use.
input_reader_spec: specification of input reader to read data from.
output_writer_spec: specification of output writer to save reduce output to.**
mapper_params: parameters to use for mapper phase.
reducer_params: parameters to use for reduce phase.
shards: number of shards to use as int.
combiner_spec: Optional. Specification of a combine function. If not
supplied, no combine step will take place. The combine function takes a
key, list of values and list of previously combined results. It yields
combined values that might be processed by another combiner call, but will
eventually end up in reducer. The combiner output key is assumed to be the
same as the input key.
Returns:
filenames from output writer.
"""