我目前使用MongoDB 2.6通过MongoHQ。我有几个mapreduce作业,它们处理来自集合(c1)的原始数据以生成新集合(c2)。我还有一个聚合管道,它通过解析(c2)来生成一个新的集合(c3),并使用强大的$out操作符。
然而,我需要添加额外的字段(c3)以外的聚合管道,并保持它们,即使在聚合的新运行后,但似乎聚合,基于_id键只是覆盖内容而不更新它。因此,如果我之前添加了一个额外的字段,如foo: 'bar'到(c3),我重新运行聚合,我将失去foo字段。
基于文档(http://docs.mongodb.org/manual/reference/operator/aggregation/out/#pipe._S_out)
替换现有集合
如果$out操作指定的集合已经存在,那么在聚合完成后, $out阶段将自动用新的结果集合替换现有的集合。$out操作不会更改前一个集合上存在的任何索引。如果聚合失败,$out操作不会更改预先存在的集合。
是否有更好的方法或棘手的一个:-)更新$out集合,而不是覆盖相同_id的记录?我可以写一个python脚本或javascript来完成这项工作,但我想避免做很多数据库调用,并以一种更聪明的方式聚合。也许这是不可能的,所以我将寻找一个不同的和更"经典"的路径。
感谢您的帮助
好吧,不是直接使用 $out
操作符,而是直接使用 mapReduce
输出,这几乎是一个"覆盖"操作(尽管mapReduce确实有"合并"one_answers"减少"模式)。
但是因为你有一个MongoDB 2.6版本,你实际上返回一个"游标"。因此,虽然"客户端/服务器"交互可能不像您希望的那样最佳,但您也有"批量更新"操作,因此您可以按照以下行做一些事情:
var cursor = db.collection.aggregate([
// pipeline here
]);
var batch = [];
while ( cursor.hasNext() ) {
var doc = cursor.next();
var updoc = {
"q": { "_id": doc._id },
"u": {
// only new fields except for
"$setOnInsert": {
// the fields you expect to add from before
},
"upsert": true
}
};
batch.push(updoc);
// try to do sensible under 16MB updates, number may vary
if ( ( batch.length % 500 ) == 0 ) {
db.runCommand({
"update": "newcollection",
"updates": batch
});
batch = []; // reset the content
}
}
db.runCommand({
"update": "newcollection",
"updates": batch
});
当然,尽管会有很多反对者,而且不是没有理由的,因为你真的需要权衡后果(这是非常真实的),你总是可以用 db.eval()
包装本质上是一个JavaScript调用,以获得完整的服务器端执行。
但是在可能的情况下(除非您有一个完全远程的数据库解决方案),通常建议采用"客户机/服务器"选项,但保持进程尽可能"靠近"(在网络术语中)服务器。
与Map reduce不同,聚合框架中的$out
操作符似乎有一组非常具体的预定义行为(http://docs.mongodb.org/manual/reference/operator/aggregation/out/#behaviors),然而,$out
选项似乎可能会改变,我没有找到与此特定情况相关的JIRA,但其他人已经发布了更改(https://jira.mongodb.org/browse/SERVER-13201)。
至于现在解决你的问题,你要么被迫返回到Map Reduce(我不知道它是从哪里运行的场景),要么以某种方式聚合,允许你输入新数据和你需要的旧数据。
实现这一目标的最常见方法可能是使用新数据更新原始行,可能是通过将原始行聚合回其本身。
感谢大家的留言。由于我不想使用游标(请求消耗),我尝试通过组合2个map reduce作业和一个聚合来获得作业。这是相当"胖",但它的工作,可以给别人一些想法。当然,我将非常高兴听到你的其他很好的选择。
因此,我有一个集合c1,这是您可以通过值对象看到的先前mapreduce作业的结果。c1:{id:'xxxx',值:{语言:'…’,关键词:‘…',参数:'…', field1: val1, field2: val2}}xxxx唯一ID键为该值的拼接。语言,价值。关键字和值。参数如下:*xxxx = _*
我有另一个集合c2: {_id: ObjectID,语言:'…",关键字:"……', field1: val1, field2: val2, labels: 'yyyyy'}这是相当一个c1集合的投影,但有一个额外的字段标签,这是一个字符串,不同的标签,逗号分隔。这个c2集合是语言和关键字及其附加字段值的所有组合的中央存储库。
目标是对c1集合中的所有记录进行分组组键_,进行一些计算其他字段,并将结果存储到c2集合中,但保留旧的'labels'字段从c2中使用相同的键。所以fields1 &2的每次启动整个集合时,都会重新计算这个c2集合批处理,但标签字段将保持不变。
正如我在第一条消息中所描述的,通过使用聚合或mapreduce作业,您无法达到此目标,因为'labels'字段将被删除。
因为我不想使用游标和其他foreach循环,这是非常网络和数据库请求消耗(我有一个大的集合,我使用一个MongoHQ服务)我尝试使用mapreduce和聚合作业来解决这个问题。
一期
所以,首先我运行一个mapreduce作业(m1),这是c2集合的一种副本,但清除field1 &2到0。结果将存储在c3集合中。
function m1Map(){
language = this['value']['language'];
keyword = this['value']['keyword'];
labels = this['labels'];
key = language + '_' + keyword;
emit(key,{'language':language,'keyword':keyword,'field1': 0, 'field2': 0.0, 'labels' : labels});
}
function m1Reduce(key,values){
language = values[0]['language'];
keyword = values[0]['keyword'];
labels = values[0]['labels'];
return {'language':language,'keyword':keyword,'field1': 0, 'field2': 0.0, 'labels' : labels}};
}
现在,c3是c2 collection的副本,field1&2设置为0。这是这个系列的形状: c3: { id: ,价值:{语言:"……’,关键词:‘…",field1: 0, field2: 0.0,标签 : '...'}}
二期
在第二步中,我运行一个mapreduce作业(m2),它按键_对c1集合值进行分组,并在我的示例中投影一个带有固定值'x'的额外字段'labels'。这个'x'值永远不会在c2集合上使用,它是一个特殊值。这个m2 mapreduce作业的输出将存储在与之前相同的c3集合中,并在out指令中使用'reduce'选项。python脚本将被进一步描述。
function m2Map(){
language = this['value']['language'];
keyword = this['value']['keyword'];
field1 = this['value']['field1'];
field2 = this['value']['field2'];
key = language + '_' + keyword;
emit(key,{'language':language,'keyword':keyword,'field1': field1, 'field2': field2, 'labels' : 'x'});
}
然后我对Reduce函数做了一些计算:
function m2Reduce(key,values){
// Init
language = values[0]['language'];
keyword = values[0]['keyword'];
field1 = 0;
field2 = 0;
bLabel = 0;
for (var i = 0; i < values.length; i++){
if (values[i]['labels'] == 'x') {
// We know these emit values are coming from the map and not from previous value on the c2 collection
// 'x' is never used on the c2 collection
field1 += parseInt(values[i]['field1']);
field2 += parseFloat(values[i]['field2']);
} else {
// these values are from the c2 collection
if (bLabel == 0) {
// we keep the former value for the 'labels' field
labels = values[i]['labels'];
bLabel = 1;
} else {
// we concatenate the 'labels' field if we have 2 records but theorytically it is impossible as c2 has only one record by unique key
// anyway, a good check afterwards :-)
labels += ','+values[i]['labels'];
}
}
}
if (bLabel == 0) {
// if values are only coming from the map emit, we force again the 'x' value for labels, it these values are re-used in another reduce call
labels = 'x';
}
return {'language':language,'keyword':keyword, 'field1': field1, 'field2': field2, 'labels' : labels};
}
Python mapreduce脚本调用两个m1 &M2 mapreduce作业(参见pymongo的导入:http://api.mongodb.org/python/2.7rc0/installation.html)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymongo import MongoClient
from pymongo import MongoReplicaSetClient
from bson.code import Code
from bson.son import SON
# MongoHQ
uri = 'mongodb://user:passwd@url_node1:port,url_node2:port/mydb'
client = MongoReplicaSetClient(uri,replicaSet='set-xxxxxxx')
db = client.mydb
coll1 = db.c1
coll2 = db.c2
#Load map and reduce functions
m1_map = Code(open('m1Map.js','r').read())
m1_reduce = Code(open('m1Reduce.js','r').read())
m2_map = Code(open('m2Map.js','r').read())
m2_reduce = Code(open('m2Reduce.js','r').read())
#Run the map-reduce queries
results = coll2.map_reduce(m1_map,m1_reduce,"c3",query={})
results = coll1.map_reduce(m2_map,m2_reduce,out=SON([("reduce", "c3")]),query={})
<<h1> 第三阶段/strong> 此时,我们有一个c3集合,它包含了所有字段1 &2计算值并保留标签。因此,现在,我们必须运行最后一个聚合管道,将c3内容(以具有复合值的mapreduce形式)复制到更经典的集合c2,该集合具有平坦字段,不包含值对象。
db.c3.aggregate([{$project : { _id: 0, keyword: '$value.keyword', language: '$value.language', field1: '$value.field1', field2 : '$value.field2', labels : '$value.labels'}},{$out:'c2'}])
!目标达到了。这个解决方案很长,有2个mapreduce作业和一个聚合管道,但对于那些不想使用消费游标或外部循环的人来说,这是一个可选的解决方案。
谢谢。