Mapreduce与Avro-通用解析



问题声明:

  1. avro格式的数据在hdfs中可用
  2. 上述avro数据的模式也可用
  3. 这个Avro数据需要在map reduce中解析,并生成具有相同模式的输出Avro数据(需要清除传入的Avro数据)
  4. 传入的avro数据可以是任何模式

因此,需要编写一个通用的映射reduce,它可以接受任何Avro数据,但以Avro格式生成与传入的模式相同的输出。

代码(经过多次尝试,这就是我达到的程度)

驱动程序

public class AvroDriver extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        job.setJarByClass(AvroMapper.class);
        job.setJobName("Avro With Xml Mapper");
        job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);
        //This is required to use avro-1.7.6 and above
        job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setInputFormatClass(AvroKeyInputFormat.class);
        job.setMapperClass(AvroMapper.class);
        Schema schema = new Schema.Parser().parse(new File(args[2]));
        AvroJob.setInputKeySchema(job, schema);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        job.setMapOutputKeyClass(AvroKey.class);
        AvroJob.setOutputKeySchema(job, schema);
        job.setNumReduceTasks(0);
        return (job.waitForCompletion(true) ? 0 : 1);
    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new AvroDriver(), args);
        System.exit(res);
    }
}

映射器

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {
        @Override
        public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
            try {
                System.out.println("Specific Record - " + key);
                System.out.println("Datum :: " + key.datum());
                System.out.println("Schema :: " + key.datum().getSchema());
                List<Field> fields = key.datum().getSchema().getFields();

                GenericRecord record = new GenericData.Record(key.datum().getSchema());
                for(Field f : fields) {
                    System.out.println("Field Name - " + f.name());
                    record.put(f.name(), key.datum().get(f.name()));
                }
                System.out.println("Record - " + record);
                GenericData d = new GenericData();
                d.newRecord(record, key.datum().getSchema());
                AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
                System.out.println("Generic Record (Avro Key) - " + outkey);
                context.write(outkey, NullWritable.get());
            } catch (Exception e) {
                e.printStackTrace();
                throw new IOException(e.getMessage());
            }
        }
    }

命令

hadoop jar$jar_name$input_avro_data_path$output_path$path_to_the_input_avro_schema

Avro Schema示例

{ "type" : "record", "name" : "Entity", "namespace" : "com.sample.avro", "fields".......

运行map reduce时遇到的问题

在中运行子级java.lang.NullPointerException时出错com.sample.avro.Entity 的com.sample.avro.Entity为null

org.apache.avro.file.DataFileWriter$AppendWriteException:java.lang.NullPointerException:在com.sample.avro.Entity中为nullcom.sample.avro.Entity

环境

HDP 2.3沙盒

有什么想法吗?

更新

我尝试了以下方法,但结果相同

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {
        @Override
        public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
            try {
                System.out.println("Specific Record - " + key);
                System.out.println("Datum :: " + key.datum());
                System.out.println("Schema :: " + key.datum().getSchema());
                List<Field> fields = key.datum().getSchema().getFields();
                Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
                List<Field> outFields  = new ArrayList<Field>();
                for(Field f : fields) {
                    System.out.println("Field Name - " + f.name());
                    Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
                    outFields.add(f1);
                }
                s.setFields(outFields);
                System.out.println("Out Schema - " + s);
                GenericRecord record = new GenericData.Record(s);
                for(Field f : fields) {
                    record.put(f.name(), key.datum().get(f.name()));
                }
                System.out.println("Record - " + record);
                GenericData d = new GenericData();
                d.newRecord(record, s);
                AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
                System.out.println("Generic Record (Avro Key) - " + outkey.datum());
                context.write(outkey, NullWritable.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

请注意,映射reduce的avro输入工作正常,但这里的问题是以avro格式输出。

最后,我找到了答案和映射程序代码,如下所示。我没有用GenericData发出AvroKey,而是改为发出GenericData.Record.

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData.Record>, NullWritable> {
        @Override
        public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
            try {
                System.out.println("Specific Record - " + key);
                System.out.println("Datum :: " + key.datum());
                System.out.println("Schema :: " + key.datum().getSchema());
                List<Field> fields = key.datum().getSchema().getFields();
                Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
                List<Field> outFields  = new ArrayList<Field>();
                for(Field f : fields) {
                    System.out.println("Field Name - " + f.name());
                    Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
                    outFields.add(f1);
                }
                s.setFields(outFields);
                System.out.println("Out Schema - " + s);
                GenericData.Record record = new GenericData.Record(s);
                for(Field f : fields) {
                    record.put(f.name(), key.datum().get(f.name()));
                }
                System.out.println("Record - " + record);
                AvroKey<GenericData.Record> outkey = new AvroKey<GenericData.Record>(record);
                System.out.println("Generic Record (Avro Key) - " + outkey.datum());
                context.write(outkey, NullWritable.get());
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(e);
                System.out.println(e.getMessage());
                throw new IOException(e.getMessage());
            }
        }
    }

相关内容

  • 没有找到相关文章

最新更新