为什么迭代ndb数据存储查询会消耗太多内存?



我有一个这样的查询:

query = HistoryLogs.query()
query = query.filter(HistoryLogs.exec_id == exec_id)
iter = query.iter()
for ent in iter:
# write log to file, nothing memory intensive

我在 for 循环中添加了日志,读取 10K 行会增加 200MB的内存使用量,然后读取接下来的 10K 行会增加额外的 200MB,依此类推。读取 100K 需要 2GB,这超出了 highmem 内存限制。

在读取 10K 行后,我尝试通过添加以下内容来清除 for 循环中的 memcache:

# clear ndb cache in order to reduce memory footprint
context = ndb.get_context()
context.clear_cache()

在 for 循环中,在每次 10K 次迭代中,但它导致查询超时,引发错误BadRequestError: The requested query has expired. Please restart it with the last cursor to read more results. ndb

我最初的期望是,通过使用query.iter()而不是query.fetch(),我不会遇到任何内存问题,并且内存几乎是恒定的,但事实并非如此。有没有办法使用迭代器读取数据,而不会超过时间和内存限制?通过清除上下文缓存,我看到内存消耗几乎是恒定的,但是我在检索所有行所需的时间方面遇到了麻烦。

顺便说一句,有很多行要检索,最多 150K。是否有可能通过一些简单的调整来完成这项工作,或者我需要一个更复杂的解决方案,例如使用一些并行化的解决方案?

您是否在远程 api-shell 中运行它?否则,我想应用程序引擎的最大请求超时将开始成为一个问题。

你绝对应该在谷歌数据流中运行它。它将为您并行化/运行得更快。

https://beam.apache.org/documentation/programming-guide/https://beam.apache.org/releases/pydoc/2.17.0/index.html https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py

我想你的pipline代码看起来像这样:

def run(project, gcs_output_prefix, exec_id):
def format_result(element):
csv_cells = [
datastore_helper.get_value(element.properties['exec_id']),
# extract other properties here!!!!!
]
return ','.join([str(cell) for cell in csv_cells])
pipeline_options = PipelineOptions([])
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
query = query_pb2.Query()
query.kind.add().name = 'HistoryLogs'
datastore_helper.set_property_filter(query.filter, 'exec_id', PropertyFilter.EQUAL, exec_id)
_ = (p 
| 'read' >> ReadFromDatastore(project, query, None)
| 'format' >> beam.Map(format_result)
| 'write' >> beam.io.WriteToText(file_path_prefix=gcs_output_prefix,
file_name_suffix='.csv',
num_shards=1) # limits output to a single file
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run(project='YOUR-PROJECT-ID', 
gcs_output_prefix='gs://YOUR-PROJECT-ID.appspot.com/history-logs-export/some-time-based-prefix/',
exec_id='1234567890')

此代码从 Google Datastore 读取,并以 csv 形式导出到 Google Cloud Storage。

在对从数据存储查询获取的大量实体进行操作时,防止达到内存和请求处理时间限制的典型解决方案是使用游标将工作负载拆分为多个可管理的块,并将它们分布在多个请求中(例如使用推送队列任务(,最终在时间上交错,以防止实例爆炸和访问输出媒体(如果有(时的争用。

通过这种方式,您可以处理几乎无限的工作负载,即使出于某种原因,您不能/不会使用 Alex 建议的不错的数据流解决方案。

您可以在如何从谷歌数据存储中删除所有条目中找到该技术的示例?

但请注意游标限制,请参阅 GAE 数据存储游标是否永久且持久?

最新更新