我有一个mapreduce程序,它从avro数据中读取数据,进行处理并输出avro数据。我有一个avro数据的模式,比如说有4列。我使用GenericData。记录以写入avro数据。
现在,我使用具有5列的模式在这些数据之上创建了一个pig关系。第5列是新的,具有在avsc文件中定义的默认值。根据我的理解,我应该能够使用带有一个额外列的新模式读取旧数据(由4列生成)。相反,我得到一个错误,上面写着-尝试访问不存在的列
我缺少什么
Mapreduce驱动程序代码
Job job = Job.getInstance(getConf());
job.setJarByClass(DeltaCaptureMRJobDriverWithSameSchema.class);
job.setJobName("CDC");
job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);
//This is required to use avro-1.7.6 and above
job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setMapperClass(DeltaCaptureMapperMultiPaths.class);
Schema schema = new Schema.Parser().parse(new File(args[2]));
AvroJob.setInputKeySchema(job, schema);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AvroValue.class);
AvroJob.setMapOutputValueSchema(job, schema);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setReducerClass(DeltaCaptureReducerMultiPaths.class);
AvroJob.setOutputKeySchema(job, schema);
job.setOutputKeyClass(AvroKey.class);
return (job.waitForCompletion(true) ? 0 : 1);
映射器代码
public class DeltaCaptureMapperMultiPaths extends Mapper<AvroKey<GenericData.Record>, NullWritable, Text , AvroValue<GenericData.Record>> {
private static final Logger LOG = Logger.getLogger(DeltaCaptureMapperMultiPaths.class);
@Override
public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
try {
System.out.println("Specific Record - " + key);
System.out.println("Datum :: " + key.datum());
System.out.println("Schema :: " + key.datum().getSchema());
AvroValue<GenericData.Record> outValue = new AvroValue<GenericData.Record>(key.datum());
System.out.println("Generic Record (out) - " + key.datum());
context.write(new Text(key.datum().get("id") +""), outValue);
} catch (Exception e) {
e.printStackTrace();
}
}
}
还原器代码
public class DeltaCaptureReducerMultiPaths extends Reducer<Text, AvroValue<GenericData.Record>, AvroKey<GenericData.Record>, NullWritable> {
@Override
public void reduce(Text key, Iterable<AvroValue<GenericData.Record>> values, Context context) throws IOException, InterruptedException {
for(AvroValue<GenericData.Record> value : values) {
AvroKey<GenericData.Record> outKey = new AvroKey<GenericData.Record>(value.datum());
context.write(outKey, NullWritable.get());
}
}
}
比方说MR输出到/etl/out。现在,下面的pig脚本失败了,出现了我在开头描述的错误。
a= LOAD '/etl/out' USING org.apache.pig.builtin.AvroStorage('hdfs:///etl/test.avsc')
b = FOREACH a GENERATE $0,$1,$2,$3,$4;
hdfs:///etl/test.avsc其中有5个字段(第5个是新字段)。
正如所怀疑的那样,问题实际上是AvroStorage的使用方式。用法为:a=LOAD'/etl/out'USING org.apache.pig.builtin.AvroStorage('test','-schemafilefile:///etl/test.avsc'),假设test.avsc在本地文件系统中。将avsc保存在hdfs中也应该有效,但我没有抽出时间进行测试。我不知道为什么没有关于这些的清晰文档!!!!