MongoDB增量式mapReduce,只选择在上次mapReduce之后添加的新文档



假设我有一个文档集合,看起来像这样(只是一个简化的例子,但它应该显示方案):

> db.data.find()
{ "_id" : ObjectId("4e9c1f27aa3dd60ee98282cf"), "type" : "A", "value" : 11 }
{ "_id" : ObjectId("4e9c1f33aa3dd60ee98282d0"), "type" : "A", "value" : 58 }
{ "_id" : ObjectId("4e9c1f40aa3dd60ee98282d1"), "type" : "B", "value" : 37 }
{ "_id" : ObjectId("4e9c1f50aa3dd60ee98282d2"), "type" : "B", "value" : 1 }
{ "_id" : ObjectId("4e9c1f56aa3dd60ee98282d3"), "type" : "A", "value" : 85 }
{ "_id" : ObjectId("4e9c1f5daa3dd60ee98282d4"), "type" : "B", "value" : 12 }

现在我需要收集该集合的一些统计信息。例如:

db.data.mapReduce(function(){
        emit(this.type,this.value);
     },function(key,values){
        var total = 0;
        for(i in values) {total+=values[i]};
        return total;
     },
{out:'stat'})

将在'stat'收集中收集总数。

> db.stat.find()
{ "_id" : "A", "value" : 154 }
{ "_id" : "B", "value" : 50 }

在这一点上,一切都很完美,但我一直停留在下一步:

  1. 'data'集合不断更新新数据(旧文件保持不变,只插入,不更新)
  2. 我想定期更新'stat'集合,但不想每次都查询整个'data'集合,所以我选择运行增量mapReduce
  3. 在每次插入'data'集合时更新'stat'集合,不使用mapReduce,这似乎很好,但实际情况比这个例子更复杂,我只想按需获取统计数据。
  4. 要做到这一点,我应该能够查询文档,这是添加后,我的最后一个mapReduce
  5. 据我所知,我不能依赖于ObjectId属性,只是存储最后一个,然后选择每个文档与ObjectId>存储,因为ObjectId不等于SQL数据库中的自动增量id(例如不同的分片会产生不同的ObjectId)。
  6. 我可以改变ObjectId生成器,但不确定如何在分片环境中做得更好

所以问题是:

是否有办法只选择文档,在最后一个mapReduce之后添加到运行增量mapReduce,或者可能有另一种策略来更新不断增长的集合上的统计数据?

您可以缓存时间并将其用作下一次增量map-reduce的屏障。

我们正在工作中测试这个,它似乎是有效的。纠正我,如果我错了,但你不能安全地做map-reduce,而插入是跨碎片发生。版本变得不一致,您的map-reduce操作将失败。(如果你找到解决这个问题的方法,请让我知道!:)

我们使用大容量插入,每5分钟一次。一旦所有的批量插入完成,我们像这样运行map-reduce(在Python中):

m = Code(<map function>)
r = Code(<reduce function>)
# pseudo code
end = last_time + 5 minutes
# Use time and optionally any other keys you need here
q = bson.SON([("date" : {"$gte" : last_time, "$lt" : end})])
collection.map_reduce(m, r, out=out={"reduce": <output_collection>}, query=q)

注意,我们使用reduce而不是merge,因为我们不想覆盖我们之前的;我们想用相同的reduce函数将旧的结果和新的结果结合起来

您可以使用_id.getTime()(从:http://api.mongodb.org/java/2.6/org/bson/types/ObjectId.html)获得ID的时间部分。这应该是可在所有分片中排序的。

编辑:对不起,那是java文档…JS版本似乎是_id.generation_time.in_time_zone(Time.zone),从http://mongotips.com/b/a-few-objectid-tricks/

我编写了一个完整的基于pymon的解决方案,它使用增量map-reduce和缓存时间,并期望在cron作业中运行。它锁住了自己,所以两个线程不能并发运行:

https://gist.github.com/2233072

""" This method performs an incremental map-reduce on any new data in 'source_table_name' 
into 'target_table_name'.  It can be run in a cron job, for instance, and on each execution will
process only the new, unprocessed records.  
The set of data to be processed incrementally is determined non-invasively (meaning the source table is not 
written to) by using the queued_date field 'source_queued_date_field_name'. When a record is ready to be processed, 
simply set its queued_date (which should be indexed for efficiency). When incremental_map_reduce() is run, any documents 
with queued_dates between the counter in 'counter_key' and 'max_datetime' will be map/reduced.
If reset is True, it will drop 'target_table_name' before starting.
If max_datetime is given, it will only process records up to that date.
If limit_items is given, it will only process (roughly) that many items. If multiple
items share the same date stamp (as specified in 'source_queued_date_field_name') then
it has to fetch all of those or it'll lose track, so it includes them all. 
If unspecified/None, counter_key defaults to counter_table_name:LastMaxDatetime.
"""

我们使用'normalized' ObjectIds来解决这个问题。我们正在做的步骤:

  1. normalize id -从当前/存储/最后处理的id中获取时间戳并设置其他id的最小值部分。c#代码:new ObjectId(objectId.Timestamp, 0, short.MinValue, 0)
  2. 使用所有具有id的项目运行map-reduce大于我们的规范化id,跳过已经处理的项。
  3. 存储最后处理的id,并标记所有处理的项目。

注:部分边界项将被多次处理。为了解决这个问题,我们在处理的项目中设置了某种标志。

最新更新