MapReduce数据丢失



我的代码执行10000行。

映射器伪代码:

int rows=0;
map()
   {rows++}  
cleanup(Context c)
   {print(rows)}

这段代码打印:

2669
3354
3353
621
(sum=9997)

为什么总和是9997?

减速器伪代码:

int rows=0;
reduce()
   {rows++}  
cleanup(Context c)
   {print(rows)}

减速器打印:3354

所有其他数据在哪里?

编辑1

我已经找到主要问题了。

我的错误是发送的键是行号。当映射器调用cleanup()函数时,它重置行计数器(保存在应用程序的驱动程序中)。因此这个键不是唯一的。我可以通过从map函数的参数中发送键来解决这个问题吗?我不认为cleanup()会重置这个参数。

如果我在应用程序的驱动程序中使用全局变量,是否存在同步问题?

编辑2

我的代码执行10000行(和1标题行)

驱动程序伪代码:

public static enum COUNTER {ROW};

映射器伪代码:

map()
   {row=context.getCounter(RWDriver.COUNTER.ROW).increment(1);
    context.write(row,new Text(...))
   }     
cleanup(Context c)
   {print(c.getCounter(RWDriver.COUNTER.ROW).getValue());}

这段代码打印:

2670
3355
3354
622
(sum=10001 correct)

在2670,3355之后,缓冲区已满,MapReduce自动将计数器ROW重置为0。我需要实际的行数,但是这个方法不起作用

对数据的解释可能是错误的。

你应该使用Map-Reduce框架计数器或用户定义的计数器:


Map-Reduce Framework Counters

Map input records
Map output records
Map output bytes
Reduce input groups
Reduce input records
Reduce output records

用户自定义计数器

class mapper()
{
static enum Counters { INPUT_LINES }
map()
{
  context.getCounter(Counters.INPUT_LINES).increment(1);
}

在Reducer中也类似。

获取计数器的值

Configuration conf = new Configuration();
Cluster cluster = new Cluster(conf);
Job job = Job.getInstance(cluster,conf);
result = job.waitForCompletion(true);
...
Counters counters = job.getCounters();
for (CounterGroup group : counters) {
  System.out.println("* Counter Group: " + group.getDisplayName() + " (" + group.getName() + ")");
  System.out.println("  number of counters in this group: " + group.size());
  for (Counter counter : group) {
    System.out.println("  - " + counter.getDisplayName() + ": " + counter.getName() + ": "+counter.getValue());
  }
}

最新更新