迷你可迭代模型映射器为谷歌应用引擎python



我认为我想要的东西不存在。谁能帮我创建一个迷你映射类?详细的伪代码或实际的python都可以。 更新:简单,工作版本在文章底部。

更新2 - 6月20日

  • 更安全的执行:continue/break/return within iterall()现在可以工作了
  • 添加了defer_db标志,用于向任务队列发送db put和delete
  • 为抽象的目的,可以指定一个过滤器函数,每个实体必须通过,否则它将不会在迭代时返回。
  • 将。bdelete()更改为。bdel()

更新3 - 6月21日

  • 修复了上次更新中导致无法保存的主要错误。
我希望这对除了我以外的人有用。我经常使用它,它是MapReduce的一个舒适的中间部分,当你只需要游标时,因为你不确定你要处理多少结果。

这是关于什么的?

gae的mapreduce库很棒,但我想要一些轻量级和一次性的东西。在python gae教程中,您经常会看到迭代、修改和保存数据库模型。我不认为有更多这样的例子,因为我们知道这是非常低效的,每个循环调用一次数据存储,而不是批处理。虽然我喜欢这个接口,但我经常发现自己需要一种简单快速的方式来运行我的数据库模型。

会是什么样子?

使用

  1. 导入类
  2. 告诉它你想映射到哪个模型
  3. 给它可选的查询过滤器
  4. 获取迭代器对象
  5. 循环,安全的知道你没有做成千上万的不必要的db调用。

幕后

这就是我需要你帮助的地方,因为我觉得我不知所措

Generator (我从来没有使用过生成器,只是有点理解它们) object批量抓取数据存储项(抓取多少是安全的?)是否存在硬性限制,还是取决于项目大小?),并以可迭代的方式呈现它们。一旦达到MAX_AMOUNT batch_size,将批处理保存到数据存储中,并无缝地抓取下一批(光标)。

我正在考虑的一件事是使用defer将项目保存到db,目的是在我们循环许多项目时节省一些时间。可能的缺点是下一部分代码期望地图已经完成。所以我认为这将是很好的有一个'defer_db'标志被设置或忽略取决于用户的偏好。如果你只期待少量的项目,那么你就不会设置defer标志。

结论

请为这个小项目贡献代码概念。接受的答案将是一个星期后获得最多赞的答案。诚然,我觉得让SO为我想出一个解决方案有点肮脏,但说实话,我觉得自己无法胜任这项任务。我希望它对你有用。

<标题> 例子

相同的查询函数

country_mim = MIM(CountryModels.all()).filter("spoken_language =", "French")
country_mim.order("population")

嵌套迭代
some_mim = MIM(SomeModel.all())
for x in some_mim.iterall():
    if x.foo == 'ham sandwich':
        sandwich_mim = MIM(MySandwiches.all())
        for sandwich in sandwich_mim.iterall():
            if 'ham' in sandwich.ingredients:
                print 'yay'

批量保存&删除

country_mim = MIM(CountryModels.all()).order("drinking_age")
for country in country_mim.iterall():
    if country.drinking_age > 21:   # these countries should be nuked from orbit
        country_mim.bdel(country)   # delete
    if country.drinking_age == 18:
        country.my_thoughts = "god bless you foreigners"
        country_mim.bput(country)   # save
    if country.drinking_age < 10:   # panic
        country.my_thoughts = "what is this i don't even..."
        country_mim.bput(country)
        break   # even though we panicked, the bput still resolves

MiniIterMapper.py

我已经使用这个代码好几个星期了,一切似乎都很好。Defer不包括在内。查询facade代码是从大PagedQuery模块窃取的(经过许可)。支持批量保存和批量删除

import google.appengine.ext.db as db
from google.appengine.ext.deferred import defer
class MIM(object):
    """
    All standard Query functions (filter, order, etc) supported*. Default batch
    size is 100. defer_db=True will cause put and delete datastore operations to
    be deferred. allow_func accepts any function you wish and only the entities
    that cause the function to return a true value will be returned during
    iterall(). Using break/continue/return while iterating doesn't cause things
    to explode (like it did in the 1st version).
    * - thanks to http://code.google.com/p/he3-appengine-lib/wiki/PagedQuery
    """
    def __init__(self, query, batch_size=100, defer_db=False, allow_func=None):
        self._query =       query
        self._batch_size =  batch_size
        self._defer_db =    defer_db
        self._allow_func =  allow_func
        self._to_save =     []
        self._to_delete =   []
        # find out if we are dealing with another facade object
        if query.__dict__.has_key('_query'): query_to_check = query._query
        else: query_to_check  = query
        if isinstance(query_to_check, db.Query):        self._query_type = 'Query'
        elif isinstance(query_to_check, db.GqlQuery):   self._query_type = 'GqlQuery'
        else: raise TypeError('Query type not supported: ' + type(query).__name__)
    def iterall(self):
        "Return iterable over all datastore items matching query. Items pulled from db in batches."
        results =               self._query.fetch(self._batch_size) # init query
        savedCursor =           self._query.cursor()                # init cursor
        try:
            while results:
                for item in results:
                    if self._allow_func:
                        if self._allow_func(item):
                            yield item
                    else:
                        yield item
                if len(results) ==  self._batch_size:
                    results =       self._query.with_cursor(savedCursor).fetch(self._batch_size)
                    savedCursor =   self._query.cursor()
                else:                   # avoid additional db call if we don't have max amount
                    results =       []  # while loop will end, and go to else section.
            else:
                self._finish()
        except GeneratorExit:
            self._finish()
    def bput(self, item):
        "Batch save."
        self._to_save.append(item)
        if len(self._to_save) >= self._batch_size:
            self._bput_go()
    def bdel(self, item):
        "Batch delete."
        self._to_delete.append(item)
        if len(self._to_delete) >= self._batch_size:
            self._bdel_go()
    def _bput_go(self):
        if self._defer_db:
            defer(db.put, self._to_save)
        else: db.put(self._to_save)
        self._to_save = []
    def _bdel_go(self):
        if self._defer_db:
            defer(db.delete, self._to_delete)
        else: db.delete(self._to_delete)
        self._to_delete = []
    def _finish(self):
        "When done iterating through models, could be that the last few remaining weren't put/deleted yet."
        if self._to_save:   self._bput_go()
        if self._to_delete: self._bdel_go()
    # FACADE SECTION >>>
    def fetch(self, limit, offset=0):
        return self._query.fetch(limit,offset)
    def filter(self, property_operator, value):
        self._check_query_type_is('Query')
        self._query = self._query.filter(property_operator, value)
        return self
    def order(self, property):
        self._check_query_type_is('Query')
        self._query.order(property)
        return self
    def ancestor(self, ancestor):
        self._check_query_type_is('Query')
        self._query.ancestor(ancestor)
        return self
    def count(self, limit=1000):
        return self._query.count(limit)
    def _check_query_type_is(self, required_query_type):
        if self._query_type != required_query_type:
            raise TypeError('Operation not allowed for query type ('
                            + type(self._query).__name__)

为什么不想使用Mapreduce?它正是为这个用例设计的,已经完成了您想要的所有功能,并且可以通过编程方式调用。"轻量级"是一个非常模糊的术语,但我不知道mapreduce库是否完全适合您的任务的任何理由-并且几乎没有理由复制该功能。

相关内容

  • 没有找到相关文章

最新更新