我真的陷入了困境,我必须强制mapReduce框架只对特定键使用一个reducer
。我还想影响框架对键的排序方式。我将通过一个例子介绍这个问题:
我想以以下形式发出键值对:
x b> :
<b x> :
<b> :
<b a x> :
图1
键是一个序列 - 如您所见 - 每个序列都以项目 b 开头,这将是数据类型 string
.值将ObjectID
s,如字母 d 和数字所示。我从 map
函数发出其他键值对,该函数以键中的不同项开头,例如 a 或 x:
<a b x> :
<x> :
<x a> :
图2
我需要强制框架为每个键值对调用单个reduce
函数,该函数以特定项目开头。此外,我必须在map
和reduce
之间强制排序,以按反向词典顺序对键进行排序。因此,单个化简器将接收项目 b 的以下键值对:
x b> :
<b x> :
<b a x> :
<b> :
图3
我尝试过:
我尝试以以下形式发出键值对:
通过这种方式,单个化简器B : <(D1 : b>)>
B : <(D2 : <B x>)>
B : <(D3 : <B>)>
B : <(D2 : ), (d3 : <b a x>) >
图4
接收到项b的值,但正如你所看到的,不是以反向词典顺序,最糟糕的是,不能保证单个化简器会获得特定键的所有值(正如MongoDB的MapReduce文档所述)。
基本上:我必须以相反的词典顺序处理这些以特定项目开头的序列。
我没有想法可以引导我进一步找到解决方案。如何对键强制实施单个化简器并影响排序?我应该如何设计传递(发出)的数据结构以满足我的需求?
这些功能类似于Hadoop的Comparator
和Partitioner
。
更新------------------------------------------------------------------------------------------------------------------------
Asya Kamsky 向我指出,finalize
每个键只运行一次,因此它解决了分区问题,当每个值必须由特定键的单个化简器看到时。
排序仍然是一个问题。对于大型数据集,在finalize
内部实现我自己的排序将意味着执行时间方面的巨大瓶颈,而我没有利用map
和reduce
之间的自然排序机制。键的数据类型为 string
,但很容易将它们编码为负数integers
强制反向排序。
让我们再次检查图 3:
x b> :
<b x> :
<b a x> :
<b> :
图3
这就是finalize
必须收到密钥 b 的内容。例如,键< b x b >
在这里是复合的。Finalize 需要接收以 b 开头的键,但对于键的其他部分,需要按反向字典顺序接收。
有没有办法实现这一点并避免finalize
内部的排序?
您可以做的是"正常"发出文档并使用reduce将所有发出的值合并到一个排序数组中。 然后使用finalize
方法在单个减速器中执行您要执行的任何处理。
MongoDB reduce函数可以多次调用,但也可以从不调用(在为特定键只发出单个值的情况下)。 使用 finalize
可以解决这两个问题,因为每个键只调用一次。
示例数据:
> db.sorts.find()
{ "_id" : 1, "b" : 1, "a" : 20 }
{ "_id" : 2, "b" : 1, "a" : 2 }
{ "_id" : 3, "b" : 2, "a" : 12 }
{ "_id" : 4, "b" : 3, "a" : 1 }
{ "_id" : 5, "b" : 2, "a" : 1 }
{ "_id" : 6, "b" : 3, "a" : 11 }
{ "_id" : 7, "b" : 3, "a" : 5 }
{ "_id" : 8, "b" : 2, "a" : 1 }
{ "_id" : 9, "b" : 1, "a" : 15 }
地图功能:
map = function() {
emit( this.b, { val: [ this.a ] } );
}
reduce函数,通过遍历数组将新的传入val添加到排序数组中:
reduce = function( key, values) {
var result = { val: [ ] };
values.forEach(function(v) {
var newval = v.val[0];
var added = false;
for (var i=0; i < result.val.length; i++) {
if (newval < result.val[i]) {
result.val.splice(i, 0, newval);
added=true;
break;
}
}
if ( !added ) {
result.val.splice(result.val.length, 0, newval);
}
});
return result;
}
Finalize 只返回一个简单的数组:
finalize = function( key, values ) {
// values is document with a sorted array
// do your "single reduce" functionality here
return values.val;
}
运行 MapReduce:
> db.sorts.mapReduce(map, reduce, {out:"outs", finalize:finalize})
{
"result" : "outs",
"timeMillis" : 10,
"counts" : {
"input" : 9,
"emit" : 9,
"reduce" : 3,
"output" : 3
},
"ok" : 1,
}
结果是:
> db.outs.find()
{ "_id" : 1, "value" : [ 2, 15, 20 ] }
{ "_id" : 2, "value" : [ 1, 1, 12 ] }
{ "_id" : 3, "value" : [ 1, 5, 11 ] }