在应用引擎中使用MapReduce创建GROUP BY



我正在寻找一种使用MapReduce在数据存储中的查询中进行GROUP BY操作的方法。AFAIK应用引擎在GQL中不支持GROUP BY本身,其他开发人员建议的一个好方法是使用MapReduce。

我下载了源代码,正在研究演示代码,并尝试在我的案例中实现。但我没有成功。以下是我的尝试。也许我所做的一切都错了。所以,如果有人能帮我做到这一点,我会感谢的。


我想做的是:我在数据存储中有一堆联系人,每个联系人都有一个日期。有一堆日期相同的重复联系人。我想做的是简单地分组,收集相同日期的相同联系人。

例如:

假设我有这样的联系人:

  1. 联系人:Foo1 |日期:2012年10月1日
  2. 联系人:Foo2 |日期:2012年5月2日
  3. 联系人:Foo1 |日期:2012年10月1日

所以在MapReduce操作之后,它会是这样的:

  1. 联系人:Foo1 |日期:2012年10月1日
  2. 联系人:Foo2 |日期:2012年5月2日

对于GROUP BY功能,我认为字数就可以了。


编辑

日志中唯一显示的是:

/mapreduce/ppipeline/run 200

运行GetContactData.WordCountPipeline((u'2012-02-02',),*{})#da26a9b555e311e19b1e6d324d450c1a

结束编辑

如果我做错了什么,如果我使用错误的方法用MapReduce进行GROUP BY,请帮助我如何用MapReduze进行。


这是我的代码:

from Contacts import Contacts
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.api import mail
from google.appengine.ext.db import GqlQuery
from google.appengine.ext import db

from google.appengine.api import taskqueue
from google.appengine.api import users
from mapreduce.lib import files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce import shuffler
import simplejson, logging, re

class GetContactData(webapp.RequestHandler):
    # Get the calls based on the user id
    def get(self):
        contactId = self.request.get('contactId')
        query_contacts = Contact.all()
        query_contacts.filter('contact_id =', int(contactId))
        query_contacts.order('-timestamp_')
        contact_data = []
        if query_contacts != None:
            for contact in query_contacts:
                    pipeline = WordCountPipeline(contact.date)
                    pipeline.start()
                    record = { "contact_id":contact.contact_id,
                               "contact_name":contact.contact_name,
                               "contact_number":contact.contact_number,
                               "timestamp":contact.timestamp_,
                               "current_time":contact.current_time_,
                               "type":contact.type_,
                               "current_date":contact.date }
                    contact_data.append(record)
        self.response.headers['Content-Type'] = 'application/json'
        self.response.out.write(simplejson.dumps(contact_data)) 
class WordCountPipeline(base_handler.PipelineBase):
  """A pipeline to run Word count demo.
  Args:
    blobkey: blobkey to process as string. Should be a zip archive with
      text files inside.
  """
  def run(self, date):
    output = yield mapreduce_pipeline.MapreducePipeline(
        "word_count",
        "main.word_count_map",
        "main.word_count_reduce",
        "mapreduce.input_readers.DatastoreInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "date": date,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=16)
    yield StoreOutput("WordCount", output)
class StoreOutput(base_handler.PipelineBase):
  """A pipeline to store the result of the MapReduce job in the database.
  Args:
    mr_type: the type of mapreduce job run (e.g., WordCount, Index)
    encoded_key: the DB key corresponding to the metadata of this job
    output: the blobstore location where the output of the job is stored
  """
  def run(self, mr_type, output):
      logging.info(output) # here I should append the grouped duration in JSON

我基于这个问题中提供的代码@autumngard,并根据我的目的进行了修改,它成功了。

相关内容

  • 没有找到相关文章

最新更新