我有一个使用模式的文档集合,像这样(一些成员编校):
{
"_id" : ObjectId("539f41a95d1887b57ab78bea"),
"answers" : {
"ratings" : {
"positivity" : [
2,
3,
5
],
"activity" : [
4,
4,
3
],
},
"media" : [
ObjectId("537ea185df872bb71e4df270"),
ObjectId("537ea185df872bb71e4df275"),
ObjectId("537ea185df872bb71e4df272")
]
}
在此模式中,第一、第二和第三个positivity
评级分别对应于media
数组中的第一、第二和第三个条目。activity
评级也是如此。我需要计算关于集合中所有文档中相关media
对象的positivity
和activity
评级的统计信息。现在,我用下面的MapReduce设置对第一个条目这样做:
var mapFunction = function() {
var activity = {
sum: this.answers.ratings.activity[0],
min: this.answers.ratings.activity[0],
max: this.answers.ratings.activity[0],
count: 1,
diff: 0
};
var positivity = {
sum: this.answers.ratings.positivity[0],
min: this.answers.ratings.positivity[0],
max: this.answers.ratings.positivity[0],
count: 1,
diff: 0
};
emit(this.media[0].str, {'activity': activity, 'positivity': positivity});
}
var reduceFunction = function(key, values) {
var activityA = values[0].activity; // will reduce into here
for (var i = 1; i < values.length; i++) {
var activityB = values[i].activity; // will merge 'b' into 'a'
// temp helpers
var delta = activityA.sum/activityA.count - activityB.sum/activityB.count; // a.mean - b.mean
var weight = (activityA.count * activityB.count)/(activityA.count + activityB.count);
// do the reducing
activityA.diff += activityB.diff + delta*delta*weight;
activityA.sum += activityB.sum;
activityA.count += activityB.count;
activityA.min = Math.min(activityA.min, activityB.min);
activityA.max = Math.max(activityA.max, activityB.max);
}
var positivityA = values[0].positivity; // will reduce into here
for (var i = 1; i < values.length; i++) {
var positivityB = values[i].positivity; // will merge 'b' into 'a'
// temp helpers
var delta = positivityA.sum/positivityA.count - positivityB.sum/positivityB.count; // a.mean - b.mean
var weight = (positivityA.count * positivityB.count)/(positivityA.count + positivityB.count);
// do the reducing
positivityA.diff += positivityB.diff + delta*delta*weight;
positivityA.sum += positivityB.sum;
positivityA.count += positivityB.count;
positivityA.min = Math.min(positivityA.min, positivityB.min);
positivityA.max = Math.max(positivityA.max, positivityB.max);
}
return {'activity': activityA, 'positivity': positivityA};
}
var finalizeFunction = function(key, value) {
value.activity.mean = value.activity.sum / value.activity.count;
value.activity.population_variance = value.activity.diff / value.activity.count;
value.activity.population_std = Math.sqrt(value.activity.population_variance);
value.activity.sample_variance = value.activity.diff / (value.activity.count - 1);
value.activity.sample_std = Math.sqrt(value.activity.sample_variance);
value.positivity.mean = value.positivity.sum / value.positivity.count;
value.positivity.population_variance = value.positivity.diff / value.positivity.count;
value.positivity.population_std = Math.sqrt(value.positivity.population_variance);
value.positivity.sample_variance = value.positivity.diff / (value.positivity.count - 1);
value.positivity.sample_std = Math.sqrt(value.positivity.sample_variance);
return value;
}
var limitingQuery = {'answers.ratings.activity':{$exists:true},'answers.ratings.positivity':{$exists:true}}
db.trials.mapReduce(mapFunction, reduceFunction, {query: limitingQuery, finalize: finalizeFunction, out: {replace: 'base_ratings', db: 'tmp'}});
使用少量文档,这一切都像我期望的那样工作。当我将它与整个集合进行比较时,发生了一些奇怪的事情。首先,当我运行db.currentOp()
时,我得到以下输出:
{
"inprog" : [
{
"opid" : 2337,
"active" : true,
"secs_running" : 2787,
"microsecs_running" : NumberLong("2787597940"),
"op" : "query",
"ns" : "eim.trials",
"query" : {
"$msg" : "query not recording (too large)"
},
"planSummary" : "COLLSCAN",
"client" : "109.201.154.152:59939",
"desc" : "conn17",
"threadId" : "0x7ef89b022700",
"connectionId" : 17,
"locks" : {
"^" : "r",
"^eim" : "R"
},
"waitingForLock" : false,
"msg" : "m/r: (1/3) emit phase M/R: (1/3) Emit Progress: 8300/1 830000%",
"progress" : {
"done" : 8300,
"total" : 1
},
"numYields" : 1133,
"lockStats" : {
"timeLockedMicros" : {
"r" : NumberLong("5075753298"),
"w" : NumberLong(2274)
},
"timeAcquiringMicros" : {
"r" : NumberLong(243155328),
"w" : NumberLong(131)
}
}
},
{
"opid" : 2480,
"active" : true,
"secs_running" : 2111,
"microsecs_running" : NumberLong(2111502538),
"op" : "query",
"ns" : "eim.trials",
"query" : {
"$msg" : "query not recording (too large)"
},
"planSummary" : "COLLSCAN",
"client" : "109.201.154.192:61609",
"desc" : "conn23",
"threadId" : "0x7ef89ac1e700",
"connectionId" : 23,
"locks" : {
"^" : "r",
"^eim" : "R"
},
"waitingForLock" : false,
"msg" : "m/r: (1/3) emit phase M/R: (1/3) Emit Progress: 7952/1 795200%",
"progress" : {
"done" : 7952,
"total" : 1
},
"numYields" : 819,
"lockStats" : {
"timeLockedMicros" : {
"r" : NumberLong("3399905661"),
"w" : NumberLong(73184)
},
"timeAcquiringMicros" : {
"r" : NumberLong(406543723),
"w" : NumberLong(145)
}
}
}
]
}
为什么Emit Progress
大于100%?我知道当这一切都在运行时,没有额外的文档被添加到集合中。inprog.progress.done
为8300
, inprog.progress.total
为1
。嗯?
更糟糕的是,这个操作最终会出错:
Error: error doing query: failed src/mongo/shell/query.js:78
db.currentOp()
继续返回与上述类似的结果,即使在报告错误之后。日志显示没有任何错误(只有行表示该命令仍在运行):
2014-06-19T13:24:15.378-0400 [conn23] M/R: (1/3) Emit Progress: 8400
你知道这里会发生什么吗?在Ubuntu 13.10上运行MongoDB 2.6.2
有时在emit中添加一些验证检查会有所帮助。"不说"总是有帮助的。
但有时我遇到这个错误,问题是在我发出的值的质量。