如何使用mapreduce批量更新满足查询的数据存储实体



我想使用 mapreduce 库来更新所有满足查询的实体。有几个并发症:

  1. 查找要更新的实体的查询检查特定属性 "property1" 包含在一长串值 (~10000条目)来自 CSV 文件
  2. 对于满足查询的每个实体,需要更新另一个属性"property2",使其等于 csv 文件的第二列和同一行中的值

我知道如何将 csv 文件上传到 Blobstore 并使用 Blobstore 输入读取器读取每一行。我还知道使用查询获取实体的数据存储输入读取器。

我的问题是,如何创建一个映射器类,该类从 Blobstore 读取输入数据,获取数据存储实体并尽可能高效地更新它们?

鉴于 property1 的可能值列表很长,使用查询进行筛选似乎不是一个好的选择(因为您需要使用 IN 筛选器,它实际上为每个值运行一个查询)

使用 MR 的

替代方法是使用 Map 将 CSV 加载到内存中(从属性 1 加载到属性 2),然后触发迭代所有实体的 MR 作业,如果其属性 1 是映射上键的一部分,请使用映射值对其进行修改。

正如 @Ryan B 所说,如果您只想利用批量放置,则无需为此使用 MR,因为您可以使用 Iterable 来使用数据存储服务进行放置。

您可以使用 DatastoreInputReader,并在映射函数中找出属性 1 是否实际上在 csv 中: 每次从 csv 读取会非常慢,您可以做的是在从它自己的数据存储模型中读取一次后使用 memcache 提供该信息。要填充数据存储模型,我建议使用 property1 作为每行的自定义 ID,这样查询就很简单了。您只需更新那些实际更改的值的数据存储,并使用突变池来提高其性能 (op.db.Put())。我给你留下伪代码(对不起...我只有python)的不同部分是什么样子的,我进一步建议您阅读Google App Engine上的Mapreduce上的这篇文章:http://sookocheff.com/posts/2014-04-15-app-engine-mapreduce-api-part-1-the-basics/

#to get the to_dict method
from google.appengine.ext import ndb
from mapreduce import operation as op 
from mapreduce.lib import pipeline
from mapreduce import mapreduce_pipeline
class TouchPipeline(pipeline.Pipeline):
    """
    Pipeline to update the field of entities that have certain condition
    """
    def run(self, *args, **kwargs):
        """ run """
        mapper_params = {
            "entity_kind": "yourDatastoreKind",
        }
        yield mapreduce_pipeline.MapperPipeline(
            "Update entities that have certain condition",
            handler_spec="datastore_map",
            input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
            params=mapper_params,
            shards=64)

class csvrow(ndb.Model):
  #you dont store property 1 because you are going to use its value as key
  substitutefield=ndb.StringProperty()
def create_csv_datastore():
  # instead of running this, make a 10,000 row function with each csv value, 
  # or read it from the blobstore, iterate and update the values accordingly
  for i in range(10000):
    #here we are using our own key as id of this row and just storing the other column that
    #eventually will be subtitute if it matches
    csvrow.get_or_insert('property%s' % i, substitutefield = 'substitute%s').put()

def queryfromcsv(property1):
  csvrow=ndb.Key('csvrow', property1).get()
  if csvrow:
    return csvrow.substitutefield
  else:
    return property1
def property1InCSV(property1):
  data = memcache.get(property1)
  if data is not None:
      return data
  else:
      data = self.queryfromcsv(property1)
      memcache.add(property1, data, 60)
      return data
def datastore_map(entity_type):
  datastorepropertytocheck = entity_type.property1
  newvalue = property1InCSV(datastorepropertytocheck)
  if newvalue!=datastoreproperty:
    entity_type.property11 = newvalue
    #use the mutation pool
    yield op.db.Put(entity)

相关内容

  • 没有找到相关文章

最新更新