如何在MapReduce中使用ORCFile输入/输出格式



我需要实现基于ORCFile I/O格式的自定义I/O格式。我该怎么做呢?

具体来说,我需要一种在我的源代码中包含ORCFile库的方法(这是一个自定义的Pig实现),并使用ORCFile输出格式写入数据,然后使用ORCFile输入格式读取数据。

您需要创建您的InputFormat类的子类(或FileInputFormat,取决于您的文件的性质)

只要谷歌一下Hadoop InputFormat,你就会发现很多关于如何创建自己的InputFormat类的文章和教程。

可以使用HCatalog库在mapreduce中读写orc文件

这里只写了一个示例代码。希望能有所帮助。示例映射程序代码

public static class MyMapper<K extends WritableComparable, V extends Writable> 
extends MapReduceBase implements Mapper<K, OrcStruct, Text, IntWritable> {
    private StructObjectInspector oip; 
    private final OrcSerde serde = new OrcSerde();
    public void configure(JobConf job) {
        Properties table = new Properties();
        table.setProperty("columns", "a,b,c");
        table.setProperty("columns.types", "int,string,struct<d:int,e:string>");
        serde.initialize(job, table);
        try {
            oip = (StructObjectInspector) serde.getObjectInspector();
        } catch (SerDeException e) {
            e.printStackTrace();
        }
    }
    public void map(K key, OrcStruct val,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
                    throws IOException {
        System.out.println(val);
        List<? extends StructField> fields =oip.getAllStructFieldRefs();
        StringObjectInspector bInspector =
                (StringObjectInspector) fields.get(B_ID).getFieldObjectInspector();
        String b = "garbage";
        try {
            b =  bInspector.getPrimitiveJavaObject(oip.getStructFieldData(serde.deserialize(val), fields.get(B_ID)));
        } catch (SerDeException e1) {
            e1.printStackTrace();
        }

        OrcStruct struct = null;
        try {
            struct = (OrcStruct) oip.getStructFieldData(serde.deserialize(val),fields.get(C_ID));
        } catch (SerDeException e1) {
            e1.printStackTrace();
        }
        StructObjectInspector cInspector = (StructObjectInspector) fields.get(C_ID).getFieldObjectInspector();
        int d =   ((IntWritable) cInspector.getStructFieldData(struct, fields.get(D_ID))).get();
        String e = cInspector.getStructFieldData(struct, fields.get(E_ID)).toString();
        output.collect(new Text(b), new IntWritable(1));
        output.collect(new Text(e), new IntWritable(1));
    }

}

启动代码

JobConf job = new JobConf(new Configuration(), OrcReader.class);
    // Specify various job-specific parameters     
    job.setJobName("myjob");
    job.set("mapreduce.framework.name","local");
    job.set("fs.default.name","file:///");
    job.set("log4j.logger.org.apache.hadoop","INFO");
    job.set("log4j.logger.org.apache.hadoop","INFO");
    //push down projection columns
    job.set("hive.io.file.readcolumn.ids","1,2");
    job.set("hive.io.file.read.all.columns","false");
    job.set("hive.io.file.readcolumn.names","b,c");
    FileInputFormat.setInputPaths(job, new Path("./src/main/resources/000000_0.orc"));
    FileOutputFormat.setOutputPath(job, new Path("./target/out1"));
    job.setMapperClass(OrcReader.MyMapper.class);
    job.setCombinerClass(OrcReader.MyReducer.class);
    job.setReducerClass(OrcReader.MyReducer.class);

    job.setInputFormat(OrcInputFormat.class);
    job.setOutputFormat(TextOutputFormat.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    JobClient.runJob(job);

相关内容

  • 没有找到相关文章

最新更新