我收到以下错误:
Task attempt_201304161625_0028_m_000000_0 failed to report status for 600 seconds. Killing!
用于我的Map作业。这个问题类似于这个,这个,还有这个。但是,我不想增加hadoop杀死未报告进度的任务的默认时间,即
Configuration conf=new Configuration();
long milliSeconds = 1000*60*60;
conf.setLong("mapred.task.timeout", milliSeconds);
相反,我想使用context.progress()
, context.setStatus("Some Message")
或context.getCounter(SOME_ENUM.PROGRESS).increment(1)
或类似的东西定期报告进度。然而,这仍然会导致任务被杀死。下面是我试图报告进度的代码片段。映射器:
protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
//do some things
Optimiser optimiser = new Optimiser();
optimiser.optimiseFurther(<some parameters>, context);
//more things
context.write(newKey, newValue);
}
优化器类中的optimisfurther方法:
public void optimiseFurther(<Some parameters>, TaskAttemptContext context) {
int count = 0;
while(something is true) {
//optimise
//try to report progress
context.setStatus("Progressing:" + count);
System.out.println("Optimise Progress:" + context.getStatus());
context.progress();
count++;
}
}
映射器的输出显示状态正在更新:
Optimise Progress:Progressing:0
Optimise Progress:Progressing:1
Optimise Progress:Progressing:2
...
但是,作业仍然在默认时间后被杀死。我是否以错误的方式使用上下文?在工作设置中,我还需要做什么才能成功地报告进度?
这个问题是与Hadoop 0.20中的一个bug有关,即对context.setStatus()
和context.progress()
的调用没有报告给底层框架(设置各种计数器的调用也不工作)。有一个可用的补丁,所以更新到一个新版本的Hadoop应该可以解决这个问题。
可能发生的情况是你必须在Context中调用Reporter本身的进度方法而不能在Context本身调用它
从Cloudera<报告进展/strong>
如果你的任务在10分钟内没有报告任何进展(参见mapred.task.timeout属性),那么它将被Hadoop杀死。大多数任务不会遇到这种情况,因为它们通过读取输入和写入输出来隐式地报告进度。然而,一些不以这种方式处理记录的作业可能会与此行为发生冲突,并导致其任务被终止。模拟就是一个很好的例子,因为它们在每个映射中执行大量cpu密集型处理,并且通常只在计算结束时写入结果。它们应该以这样一种方式编写,以便定期报告进展(比每10分钟更频繁)。这可以通过多种方式实现:
Call setStatus() on Reporter to set a human-readable description of
the task’s progress
Call incrCounter() on Reporter to increment a user counter
Call progress() on Reporter to tell Hadoop that your task is
still there (and making progress)
Cloudera技巧
public Context(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split)