具有推测执行的Hadoop多个输出



我有一个任务,它将avro输出写入由输入记录的几个字段组织的多个目录中。

例如: 处理历年国家/地区的记录 并写入国家/年份的目录结构 例如: 输出/USA/2015/outputs_usa_2015.avro 输出/UK/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
multipleOutputs.write("output", avroKey, NullWritable.get(), 
OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());

下面的代码将使用什么输出提交器来编写输出。与投机执行一起使用不安全吗? 对于推测执行,这会导致(可能导致)org.apache.hadoop.hdfs.server.namenode.LeaseExpireException

在这篇文章中 Hadoop Reducer:如何使用推测执行输出到多个目录? 建议使用自定义输出提交器

下面来自hadoop的AvroMultipleOutputs的代码没有说明推测执行的任何问题

private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
String baseFileName) throws IOException, InterruptedException {
writer =
((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}

如果基本输出路径在作业目录之外,写入方法也不会记录任何问题

public void write(String namedOutput, Object key, Object value, String baseOutputPath)

在作业目录外写入时,推测执行的 AvroMultipleOutputs(其他输出)是否存在真正的问题? 如果,那么我如何覆盖 AvroMultipleOutput 以拥有它自己的输出提交器。我在 AvroMultipleOutput 中看不到任何输出格式,它使用的输出提交器

AvroMultipleOutputs将使用您注册到作业配置的OutputFormat,同时添加命名输出,例如使用AvroMultipleOutputs中的addNamedOutputAPI(例如AvroKeyValueOutputFormat)。

使用AvroMultipleOutputs,您可能无法使用推理任务执行功能。即使覆盖它也不会有帮助,或者不会简单。

相反,您应该编写自己的OutputFormat(最有可能扩展可用的 Avro 输出格式之一,例如AvroKeyValueOutputFormat),并覆盖/实现其getRecordWriterAPI,它将返回一个RecordWriter实例,例如MainRecordWriter(仅供参考)。

MainRecordWriter将维护RecordWriter地图(例如AvroKeyValueRecordWriter) 实例。这些RecordWriter实例中的每一个都属于其中一个输出文件。在MainRecordWriterwrite的 API 中,您将从映射中获取实际的RecordWriter实例(基于您要写入的记录),并使用此记录编写器写入记录。因此,MainRecordWriter只是作为多个RecordWriter实例的包装器工作。

对于一些类似的实现,您可能想研究库中piggybank多存储类的代码。

当你将命名输出添加到AvroMultipleOutputs时,它将调用AvroKeyOutputFormat.getRecordWriter()AvroKeyValueOutputFormat.getRecordWriter(),它们调用AvroOutputFormatBase.getAvroFileOutputStream(),其内容是

protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(),
getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
return path.getFileSystem(context.getConfiguration()).create(path);
}

AvroOutputFormatBase扩展FileOutputFormat(上述方法中的getOutputCommitter()实际上是对FileOutputFormat.getOutputCommitter()的调用。 因此,AvroMultipleOutputs应具有与MultipleOutputs相同的约束。

相关内容

  • 没有找到相关文章

最新更新