MongoDB的mapReduce:将键划分为单个化简器并影响键排序



我真的陷入了困境,我必须强制mapReduce框架只对特定键使用一个reducer。我还想影响框架对键的排序方式。我将通过一个例子介绍这个问题:

我想以以下形式发出键值对:

x b> :
<b x> :
<b> :
<b a x> :
图1

键是一个序列 - 如您所见 - 每个序列都以项目 b 开头,这将是数据类型 string .值将ObjectID s,如字母 d 和数字所示。我从 map 函数发出其他键值对,该函数以键中的不同项开头,例如 ax

<a b x> :
<x> :
<x a>
图2

我需要强制框架为每个键值对调用单个reduce函数,该函数以特定项目开头。此外,我必须在mapreduce之间强制排序,以按反向词典顺序对键进行排序。因此,单个化简器将接收项目 b 的以下键值对:

x b> :
<b x> :
<b a x> :
<b> :
图3

我尝试过:

我尝试以以下形式发出键值对:

B : <(D1 : b>)>
B : <(D2 : <B x>)>
B : <(D3 : <B>)>
B : <(D2 : ), (d3 : <b a x>) >
图4

通过这种方式,单个化简器

接收到项b的值,但正如你所看到的,不是以反向词典顺序,最糟糕的是,不能保证单个化简器会获得特定键的所有值(正如MongoDB的MapReduce文档所述)。

基本上:我必须以相反的词典顺序处理这些以特定项目开头的序列。

我没有想法可以引导我进一步找到解决方案。如何对键强制实施单个化简器并影响排序?我应该如何设计传递(发出)的数据结构以满足我的需求?

这些功能类似于Hadoop的ComparatorPartitioner

更新------------------------------------------------------------------------------------------------------------------------

Asya Kamsky 向我指出,finalize每个键只运行一次,因此它解决了分区问题,当每个值必须由特定键的单个化简器看到时。

排序仍然是一个问题。对于大型数据集,在finalize内部实现我自己的排序将意味着执行时间方面的巨大瓶颈,而我没有利用mapreduce之间的自然排序机制。键的数据类型为 string ,但很容易将它们编码为负数integers强制反向排序。

让我们再次检查图 3

x b> :
<b x> :
<b a x> :
<b> :
图3

这就是finalize必须收到密钥 b 的内容。例如,键< b x b >在这里是复合的。Finalize 需要接收以 b 开头的键,但对于键的其他部分,需要按反向字典顺序接收。

有没有办法实现这一点并避免finalize内部的排序?

您可以做的是"正常"发出文档并使用reduce将所有发出的值合并到一个排序数组中。 然后使用finalize方法在单个减速器中执行您要执行的任何处理。

MongoDB reduce函数可以多次调用,但也可以从不调用(在为特定键只发出单个值的情况下)。 使用 finalize 可以解决这两个问题,因为每个键只调用一次。

示例数据:

> db.sorts.find()
{ "_id" : 1, "b" : 1, "a" : 20 }
{ "_id" : 2, "b" : 1, "a" : 2 }
{ "_id" : 3, "b" : 2, "a" : 12 }
{ "_id" : 4, "b" : 3, "a" : 1 }
{ "_id" : 5, "b" : 2, "a" : 1 }
{ "_id" : 6, "b" : 3, "a" : 11 }
{ "_id" : 7, "b" : 3, "a" : 5 }
{ "_id" : 8, "b" : 2, "a" : 1 }
{ "_id" : 9, "b" : 1, "a" : 15 }

地图功能:

map = function() {
   emit( this.b, { val: [ this.a ] } );
}

reduce函数,通过遍历数组将新的传入val添加到排序数组中:

reduce = function( key, values) {
   var result = { val: [ ] };
   values.forEach(function(v) {
      var newval = v.val[0];
      var added = false;
      for (var i=0; i < result.val.length; i++) {
           if (newval < result.val[i]) {
                 result.val.splice(i, 0, newval);
                 added=true;
                 break;
           }
      }
      if ( !added ) {
         result.val.splice(result.val.length, 0, newval);
      }
   });
   return result;
}

Finalize 只返回一个简单的数组:

finalize = function( key, values ) {
   // values is document with a sorted array
   // do your "single reduce" functionality here
   return values.val;
}

运行 MapReduce:

> db.sorts.mapReduce(map, reduce, {out:"outs", finalize:finalize})
{
    "result" : "outs",
    "timeMillis" : 10,
    "counts" : {
        "input" : 9,
        "emit" : 9,
        "reduce" : 3,
        "output" : 3
    },
    "ok" : 1,
}

结果是:

> db.outs.find()
{ "_id" : 1, "value" : [  2,  15,  20 ] }
{ "_id" : 2, "value" : [  1,  1,  12 ] }
{ "_id" : 3, "value" : [  1,  5,  11 ] }

相关内容

  • 没有找到相关文章

最新更新