我已经编写了一个mapreduce程序来处理日志。除了实际输出之外,作业还会将侧面数据写入驱动程序代码中设置的输出路径外部的其他位置。但是,如果启用了推测执行,则不会删除终止任务尝试的输出。有办法避免这个问题吗?除了在作业完成后写入到正常输出位置和复制到外部位置之外,是否可以解决其他问题?
是否可以使用"OutputCommitter"解决此问题?
有人试过这个吗?如有任何帮助,我们将不胜感激。
是的,FileOutputCommitter可以用于在任务成功时将临时任务目录的内容移动到最终输出目录,并删除原始任务目录。
我相信Hadoop中扩展FileOutputFormat的大多数内置输出格式都使用OutputCommitter,默认情况下是FileOutputCommiter。
这是FileOutputFormat 的代码
public synchronized
OutputCommitter getOutputCommitter(TaskAttemptContext context
) throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}
要写入多个路径,您可能可以查看MultipleOutputs,默认情况下使用OutputCommitter。
或者,您可以创建自己的输出格式并扩展FileOutputFomat和覆盖FileOutputFormat中的上述函数,查看FileOutputCommitter代码创建自己的OutputCommiter实现。
在FileOoutputcommitter代码中,您会发现您可能感兴趣的函数-
/**
* Delete the work directory
*/
@Override
public void abortTask(TaskAttemptContext context) {
try {
if (workPath != null) {
context.progress();
outputFileSystem.delete(workPath, true);
}
} catch (IOException ie) {
LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
}
}
如果任务成功,则调用commitTask(),默认情况下实现移动临时任务输出目录(任务名称中的任务尝试ID,以避免任务之间的冲突尝试)到最终输出路径${mapred.out-put.dir}。否则,框架调用abortTask(),它删除临时任务输出目录。
为了避免在mapreduce输出文件夹中创建_logs和_SUCCESS文件,可以使用以下设置:
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",false);conf.set("hadoop.job.history.user.location","none")