我有一个hadoop程序,我想在该程序中向映射器末尾的驱动程序报告一个值。因此,在驱动程序中,我有多个值,每个值都来自一个映射器,然后我想得到值之间的最大值。我用Counter编码,这是我的代码:
protected void cleanup(Context context){
....
context.getCounter("TimeStamps", "Max").setValue(value);
}
在驱动器中我有
CounterGroup counters = job.getCounters().getGroup("TimeStamps");
Iterator<Counter> iter = counters.iterator();
while(iter.hasNext()){
Counter c = iter.next();
}
但是,只报告一个值(而不是多个值)。我应该怎么做才能让每个映射器报告一个单独的值,并且我可以在驱动程序中获取所有值?
有一种方法可以实现这一点(不过,我觉得这不是一种干净的方法)。
当您在映射器中发出计数器时,您可以获得任务尝试id,并在计数器名称前加上此前缀。
例如,在WordCount程序的map()
方法中,对于遇到的每个单词,我都会发出:
context.getCounter("ME", context.getTaskAttemptID() + ":MY_TOTAL").increment(1);
上面,ME
是计数器组。计数器名称是Task Attempt ID
和MY_TOTAL
的组合。
例如,映射任务尝试ID的形式通常为:task_1450681906391_0024_m_000000_0
(m_000000_0
用于作业中的第一个映射器)。
因此,此映射程序的计数器名称将为:task_1450681906391_0024_m_000000_0:MY_TOTAL
。
类似地,不同映射器的计数器名称为:
Mapper 2 => task_1450681906391_0024_m_000001_0:MY_TOTAL
Mapper 3 => task_1450681906391_0024_m_000002_0:MY_TOTAL
Mapper 4 => task_1450681906391_0024_m_000003_0:MY_TOTAL
现在,在驱动程序中,在设置Job::waitForCompletion(true)
:之后,我添加了以下代码
job.waitForCompletion(true);
CounterGroup counters = job.getCounters().getGroup("ME");
Iterator<Counter> iter = counters.iterator();
while(iter.hasNext()) {
Counter c = iter.next();
System.out.println(c.getName() + " " + c.getValue());
}
我为1GB的数据运行了单词计数程序,它产生了7个映射器。成功执行程序后,我得到了以下输出:
attempt_1450681906391_0024_m_000000_0:MY_TOTAL 9318964
attempt_1450681906391_0024_m_000001_0:MY_TOTAL 9068018
attempt_1450681906391_0024_m_000002_0:MY_TOTAL 9241336
attempt_1450681906391_0024_m_000003_0:MY_TOTAL 9182102
attempt_1450681906391_0024_m_000004_0:MY_TOTAL 8948100
attempt_1450681906391_0024_m_000005_0:MY_TOTAL 8992634
attempt_1450681906391_0024_m_000006_0:MY_TOTAL 8564646
您可以看到,不同映射器的计数器是单独发出的。您可以很容易地解析出映射器,并获得每个映射器的计数器,如下所示:
Mapper 1 => MY_TOTAL 9318964
Mapper 2 => MY_TOTAL 9068018
Mapper 3 => MY_TOTAL 9241336
Mapper 4 => MY_TOTAL 9182102
Mapper 5 => MY_TOTAL 8948100
Mapper 6 => MY_TOTAL 8992634
Mapper 7 => MY_TOTAL 8564646