我的代码执行10000行。
映射器伪代码:
int rows=0;
map()
{rows++}
cleanup(Context c)
{print(rows)}
这段代码打印:
2669
3354
3353
621
(sum=9997)
为什么总和是9997?
减速器伪代码:
int rows=0;
reduce()
{rows++}
cleanup(Context c)
{print(rows)}
减速器打印:3354
所有其他数据在哪里?
编辑1
我已经找到主要问题了。
我的错误是发送的键是行号。当映射器调用cleanup()
函数时,它重置行计数器(保存在应用程序的驱动程序中)。因此这个键不是唯一的。我可以通过从map函数的参数中发送键来解决这个问题吗?我不认为cleanup()
会重置这个参数。
如果我在应用程序的驱动程序中使用全局变量,是否存在同步问题?
编辑2
我的代码执行10000行(和1标题行)
驱动程序伪代码:
public static enum COUNTER {ROW};
映射器伪代码:
map()
{row=context.getCounter(RWDriver.COUNTER.ROW).increment(1);
context.write(row,new Text(...))
}
cleanup(Context c)
{print(c.getCounter(RWDriver.COUNTER.ROW).getValue());}
这段代码打印:
2670
3355
3354
622
(sum=10001 correct)
在2670,3355之后,缓冲区已满,MapReduce自动将计数器ROW重置为0。我需要实际的行数,但是这个方法不起作用
对数据的解释可能是错误的。
你应该使用Map-Reduce框架计数器或用户定义的计数器:
Map-Reduce Framework Counters
Map input records
Map output records
Map output bytes
Reduce input groups
Reduce input records
Reduce output records
用户自定义计数器
class mapper()
{
static enum Counters { INPUT_LINES }
map()
{
context.getCounter(Counters.INPUT_LINES).increment(1);
}
在Reducer中也类似。
获取计数器的值
Configuration conf = new Configuration();
Cluster cluster = new Cluster(conf);
Job job = Job.getInstance(cluster,conf);
result = job.waitForCompletion(true);
...
Counters counters = job.getCounters();
for (CounterGroup group : counters) {
System.out.println("* Counter Group: " + group.getDisplayName() + " (" + group.getName() + ")");
System.out.println(" number of counters in this group: " + group.size());
for (Counter counter : group) {
System.out.println(" - " + counter.getDisplayName() + ": " + counter.getName() + ": "+counter.getValue());
}
}