我有一个任务,它将avro输出写入由输入记录的几个字段组织的多个目录中。
例如: 处理历年国家/地区的记录 并写入国家/年份的目录结构 例如: 输出/USA/2015/outputs_usa_2015.avro 输出/UK/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
multipleOutputs.write("output", avroKey, NullWritable.get(),
OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());
下面的代码将使用什么输出提交器来编写输出。与投机执行一起使用不安全吗? 对于推测执行,这会导致(可能导致)org.apache.hadoop.hdfs.server.namenode.LeaseExpireException
在这篇文章中 Hadoop Reducer:如何使用推测执行输出到多个目录? 建议使用自定义输出提交器
下面来自hadoop的AvroMultipleOutputs的代码没有说明推测执行的任何问题
private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
String baseFileName) throws IOException, InterruptedException {
writer =
((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}
如果基本输出路径在作业目录之外,写入方法也不会记录任何问题
public void write(String namedOutput, Object key, Object value, String baseOutputPath)
在作业目录外写入时,推测执行的 AvroMultipleOutputs(其他输出)是否存在真正的问题? 如果,那么我如何覆盖 AvroMultipleOutput 以拥有它自己的输出提交器。我在 AvroMultipleOutput 中看不到任何输出格式,它使用的输出提交器
AvroMultipleOutputs
将使用您注册到作业配置的OutputFormat
,同时添加命名输出,例如使用AvroMultipleOutputs
中的addNamedOutput
API(例如AvroKeyValueOutputFormat
)。
使用AvroMultipleOutputs
,您可能无法使用推理任务执行功能。即使覆盖它也不会有帮助,或者不会简单。
相反,您应该编写自己的OutputFormat
(最有可能扩展可用的 Avro 输出格式之一,例如AvroKeyValueOutputFormat
),并覆盖/实现其getRecordWriter
API,它将返回一个RecordWriter
实例,例如MainRecordWriter
(仅供参考)。
该MainRecordWriter
将维护RecordWriter
地图(例如AvroKeyValueRecordWriter
) 实例。这些RecordWriter
实例中的每一个都属于其中一个输出文件。在MainRecordWriter
write
的 API 中,您将从映射中获取实际的RecordWriter
实例(基于您要写入的记录),并使用此记录编写器写入记录。因此,MainRecordWriter
只是作为多个RecordWriter
实例的包装器工作。
对于一些类似的实现,您可能想研究库中piggybank
多存储类的代码。
当你将命名输出添加到AvroMultipleOutputs
时,它将调用AvroKeyOutputFormat.getRecordWriter()
或AvroKeyValueOutputFormat.getRecordWriter()
,它们调用AvroOutputFormatBase.getAvroFileOutputStream()
,其内容是
protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(),
getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
return path.getFileSystem(context.getConfiguration()).create(path);
}
而AvroOutputFormatBase
扩展FileOutputFormat
(上述方法中的getOutputCommitter()
实际上是对FileOutputFormat.getOutputCommitter()
的调用。 因此,AvroMultipleOutputs
应具有与MultipleOutputs
相同的约束。