AppEngine MapReduce在使用数据存储输入阅读器时如何过滤structureproperty



在使用appengine mapreduce库时,如何通过StructuredProperty进行过滤?

我试着:

class Tag(ndb.Model):
    # ...
    tag = ndb.StringProperty()
    value = ndb.FloatProperty(indexed=False)
class User(ndb.Model):
    # ...
    tags = ndb.StructuredProperty(Tag, repeated=True)
class SamplePipeline(base_handler.PipelineBase):

    def run(self, tags, start_time, account_type, gsbucketname):
        start_time = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        filters = []
        for tag in tags:
            filters.append(("tags.tag", "=", tag))
        yield mapreduce_pipeline.MapperPipeline(
            "name",
            "mapper",
            "mapreduce.input_readers.DatastoreInputReader",
            output_writer_spec="mapreduce.output_writers.FileOutputWriter",
            params={
                "input_reader": {
                    "entity_kind": "User",
                    "batch_size": 500,
                    "filters": filters
                },
                "output_writer": {
                    "filesystem": "gs",
                    "gs_bucket_name": gsbucketname,
                },
                "root_pipeline_id": self.root_pipeline_id,
                "account_type": account_type
            },
            shards=255
        )

我得到了什么

File "/Users/lucemia/vagrant_home/adex2/lib/mapreduce/input_readers.py", line 794, in _validate_filters_ndb
    prop, model_class._get_kind())
BadReaderParamsError: ('Property %s is not defined for entity type %s', u'tags.tag', 'User')

看来mapreduce/input_readers.py_validate_filters_ndb()函数确实有bug

作为一种工作,您可以在mapreduce/input_readers.py的第792行添加以下行,以跳过对其中包含.的属性的验证,例如StructuredProperty类型的属性:

if "." in prop:
    continue

我们将修复StructuredProperty类型属性的正确验证,并在未来的版本中提供正确的代码。

相关内容

  • 没有找到相关文章

最新更新