我认为我想要的东西不存在。谁能帮我创建一个迷你映射类?详细的伪代码或实际的python都可以。
更新:简单,工作版本在文章底部。
更新2 - 6月20日
- 更安全的执行:continue/break/return within iterall()现在可以工作了
- 添加了defer_db标志,用于向任务队列发送db put和delete
- 为抽象的目的,可以指定一个过滤器函数,每个实体必须通过,否则它将不会在迭代时返回。
- 将。bdelete()更改为。bdel()
更新3 - 6月21日
- 修复了上次更新中导致无法保存的主要错误。
这是关于什么的?
gae的mapreduce库很棒,但我想要一些轻量级和一次性的东西。在python gae教程中,您经常会看到迭代、修改和保存数据库模型。我不认为有更多这样的例子,因为我们知道这是非常低效的,每个循环调用一次数据存储,而不是批处理。虽然我喜欢这个接口,但我经常发现自己需要一种简单快速的方式来运行我的数据库模型。
会是什么样子?
使用- 导入类
- 告诉它你想映射到哪个模型
- 给它可选的查询过滤器
- 获取迭代器对象
- 循环,安全的知道你没有做成千上万的不必要的db调用。
幕后
这就是我需要你帮助的地方,因为我觉得我不知所措
Generator (我从来没有使用过生成器,只是有点理解它们) object批量抓取数据存储项(抓取多少是安全的?)是否存在硬性限制,还是取决于项目大小?),并以可迭代的方式呈现它们。一旦达到MAX_AMOUNT batch_size,将批处理保存到数据存储中,并无缝地抓取下一批(光标)。
我正在考虑的一件事是使用defer将项目保存到db,目的是在我们循环许多项目时节省一些时间。可能的缺点是下一部分代码期望地图已经完成。所以我认为这将是很好的有一个'defer_db'标志被设置或忽略取决于用户的偏好。如果你只期待少量的项目,那么你就不会设置defer标志。
结论请为这个小项目贡献代码概念。接受的答案将是一个星期后获得最多赞的答案。诚然,我觉得让SO为我想出一个解决方案有点肮脏,但说实话,我觉得自己无法胜任这项任务。我希望它对你有用。
相同的查询函数
country_mim = MIM(CountryModels.all()).filter("spoken_language =", "French")
country_mim.order("population")
嵌套迭代some_mim = MIM(SomeModel.all())
for x in some_mim.iterall():
if x.foo == 'ham sandwich':
sandwich_mim = MIM(MySandwiches.all())
for sandwich in sandwich_mim.iterall():
if 'ham' in sandwich.ingredients:
print 'yay'
批量保存&删除
country_mim = MIM(CountryModels.all()).order("drinking_age")
for country in country_mim.iterall():
if country.drinking_age > 21: # these countries should be nuked from orbit
country_mim.bdel(country) # delete
if country.drinking_age == 18:
country.my_thoughts = "god bless you foreigners"
country_mim.bput(country) # save
if country.drinking_age < 10: # panic
country.my_thoughts = "what is this i don't even..."
country_mim.bput(country)
break # even though we panicked, the bput still resolves
MiniIterMapper.py
我已经使用这个代码好几个天星期了,一切似乎都很好。Defer不包括在内。查询facade代码是从大PagedQuery模块窃取的(经过许可)。支持批量保存和批量删除
import google.appengine.ext.db as db
from google.appengine.ext.deferred import defer
class MIM(object):
"""
All standard Query functions (filter, order, etc) supported*. Default batch
size is 100. defer_db=True will cause put and delete datastore operations to
be deferred. allow_func accepts any function you wish and only the entities
that cause the function to return a true value will be returned during
iterall(). Using break/continue/return while iterating doesn't cause things
to explode (like it did in the 1st version).
* - thanks to http://code.google.com/p/he3-appengine-lib/wiki/PagedQuery
"""
def __init__(self, query, batch_size=100, defer_db=False, allow_func=None):
self._query = query
self._batch_size = batch_size
self._defer_db = defer_db
self._allow_func = allow_func
self._to_save = []
self._to_delete = []
# find out if we are dealing with another facade object
if query.__dict__.has_key('_query'): query_to_check = query._query
else: query_to_check = query
if isinstance(query_to_check, db.Query): self._query_type = 'Query'
elif isinstance(query_to_check, db.GqlQuery): self._query_type = 'GqlQuery'
else: raise TypeError('Query type not supported: ' + type(query).__name__)
def iterall(self):
"Return iterable over all datastore items matching query. Items pulled from db in batches."
results = self._query.fetch(self._batch_size) # init query
savedCursor = self._query.cursor() # init cursor
try:
while results:
for item in results:
if self._allow_func:
if self._allow_func(item):
yield item
else:
yield item
if len(results) == self._batch_size:
results = self._query.with_cursor(savedCursor).fetch(self._batch_size)
savedCursor = self._query.cursor()
else: # avoid additional db call if we don't have max amount
results = [] # while loop will end, and go to else section.
else:
self._finish()
except GeneratorExit:
self._finish()
def bput(self, item):
"Batch save."
self._to_save.append(item)
if len(self._to_save) >= self._batch_size:
self._bput_go()
def bdel(self, item):
"Batch delete."
self._to_delete.append(item)
if len(self._to_delete) >= self._batch_size:
self._bdel_go()
def _bput_go(self):
if self._defer_db:
defer(db.put, self._to_save)
else: db.put(self._to_save)
self._to_save = []
def _bdel_go(self):
if self._defer_db:
defer(db.delete, self._to_delete)
else: db.delete(self._to_delete)
self._to_delete = []
def _finish(self):
"When done iterating through models, could be that the last few remaining weren't put/deleted yet."
if self._to_save: self._bput_go()
if self._to_delete: self._bdel_go()
# FACADE SECTION >>>
def fetch(self, limit, offset=0):
return self._query.fetch(limit,offset)
def filter(self, property_operator, value):
self._check_query_type_is('Query')
self._query = self._query.filter(property_operator, value)
return self
def order(self, property):
self._check_query_type_is('Query')
self._query.order(property)
return self
def ancestor(self, ancestor):
self._check_query_type_is('Query')
self._query.ancestor(ancestor)
return self
def count(self, limit=1000):
return self._query.count(limit)
def _check_query_type_is(self, required_query_type):
if self._query_type != required_query_type:
raise TypeError('Operation not allowed for query type ('
+ type(self._query).__name__)
标题>为什么不想使用Mapreduce?它正是为这个用例设计的,已经完成了您想要的所有功能,并且可以通过编程方式调用。"轻量级"是一个非常模糊的术语,但我不知道mapreduce库是否完全适合您的任务的任何理由-并且几乎没有理由复制该功能。