我有一个数据问题,我想增量聚合。
我有设备(很多,存储在设备集合中),它发出存储在记录集合中的db中的度量(不定期)。每条记录都有一个timestamp_utc,它不是存储数据的时间戳,而是度量度量的位置。(完全不同,因为设备发送了一堆测量值)
我想要的是批量数据聚合(Map Reduce)与不同规模的每一个不同的设备增量。例如,我想要一个平均每5分钟测量一次的集合,每30分钟测量一次……每天等等……我不想每次都处理整个数据,而只想处理新条目。
示例:记录集合包含:
{ _id : {device1, time : ISODate(2011-10-12T13:50:01Z)}, value : { meas1 : 2, meas2 : 4}},
{ _id : {device1, time : ISODate(2011-10-12T13:51:01Z)}, value : { meas1 : 1, meas2 : 6}},
{ _id : {device2, time : ISODate(2011-10-12T13:49:01Z)}, value : { meas1 : 3, meas2 : 7}},
{ _id : {device2, time : ISODate(2011-10-12T13:50:01Z)}, value : { meas1 : 4, meas2 : 8}},
{ _id : {device2, time : ISODate(2011-10-12T13:51:01Z)}, value : { meas1 : 5, meas2 : 9}},
在scale1上聚合数据后(每5分钟),我将得到类似
的内容{ _id : {device1, time : ISODate(2011-10-12T13:50:00Z)}, value : { meas1 : 1.5, meas2 : 5}},
{ _id : {device2, time : ISODate(2011-10-12T13:45:00Z)}, value : { meas1 : 3, meas2 : 7}},
{ _id : {device2, time : ISODate(2011-10-12T13:50:00Z)}, value : { meas1 : 4.5, meas2 : 8.5}},
等每次销售。第一次生成这些数据自然不是什么大事,它是一个非常常见的map/reduce操作。
db.record.mapReduce(map, reduce, {finalize : finalize, out : { merge : db.recordscale1 }});
当我想做增量映射缩减时,问题来了。我确实想做这样的事情
db.record.mapReduce(map, reduce, {query : { "_id.time_utc" : { $gte : timeMin } }, finalize : finalize, out : { merge : db.recordscale1 }});
问题是每个设备的timeMin是不同的,它应该对应于输出集合中插入的最后一个条目(按time_utc排序)。
我尝试了很多不成功的技术(使用insertion_time…)。
有人知道我该怎么处理吗?
我想避免在每个设备上执行一个mapreduce,因为它太低了。
我最后添加了一个带有插入时间戳的新字段。
对于每个map reduce调用,我得到reduce集合中最后处理的时间戳,我调用map reduce,并基于插入的时间戳进行查询,以避免重复处理。
如果有人有更好的主意,我很感兴趣;-)