在使用appengine mapreduce库时,如何通过StructuredProperty
进行过滤?
我试着:
class Tag(ndb.Model):
# ...
tag = ndb.StringProperty()
value = ndb.FloatProperty(indexed=False)
class User(ndb.Model):
# ...
tags = ndb.StructuredProperty(Tag, repeated=True)
class SamplePipeline(base_handler.PipelineBase):
def run(self, tags, start_time, account_type, gsbucketname):
start_time = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
filters = []
for tag in tags:
filters.append(("tags.tag", "=", tag))
yield mapreduce_pipeline.MapperPipeline(
"name",
"mapper",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
"input_reader": {
"entity_kind": "User",
"batch_size": 500,
"filters": filters
},
"output_writer": {
"filesystem": "gs",
"gs_bucket_name": gsbucketname,
},
"root_pipeline_id": self.root_pipeline_id,
"account_type": account_type
},
shards=255
)
我得到了什么
File "/Users/lucemia/vagrant_home/adex2/lib/mapreduce/input_readers.py", line 794, in _validate_filters_ndb
prop, model_class._get_kind())
BadReaderParamsError: ('Property %s is not defined for entity type %s', u'tags.tag', 'User')
看来mapreduce/input_readers.py
的_validate_filters_ndb()
函数确实有bug
作为一种工作,您可以在mapreduce/input_readers.py
的第792行添加以下行,以跳过对其中包含.
的属性的验证,例如StructuredProperty类型的属性:
if "." in prop:
continue
我们将修复StructuredProperty类型属性的正确验证,并在未来的版本中提供正确的代码。