我需要实现基于ORCFile I/O格式的自定义I/O格式。我该怎么做呢?
具体来说,我需要一种在我的源代码中包含ORCFile库的方法(这是一个自定义的Pig实现),并使用ORCFile输出格式写入数据,然后使用ORCFile输入格式读取数据。
您需要创建您的InputFormat类的子类(或FileInputFormat,取决于您的文件的性质)
只要谷歌一下Hadoop InputFormat,你就会发现很多关于如何创建自己的InputFormat类的文章和教程。
可以使用HCatalog库在mapreduce中读写orc文件
这里只写了一个示例代码。希望能有所帮助。示例映射程序代码
public static class MyMapper<K extends WritableComparable, V extends Writable>
extends MapReduceBase implements Mapper<K, OrcStruct, Text, IntWritable> {
private StructObjectInspector oip;
private final OrcSerde serde = new OrcSerde();
public void configure(JobConf job) {
Properties table = new Properties();
table.setProperty("columns", "a,b,c");
table.setProperty("columns.types", "int,string,struct<d:int,e:string>");
serde.initialize(job, table);
try {
oip = (StructObjectInspector) serde.getObjectInspector();
} catch (SerDeException e) {
e.printStackTrace();
}
}
public void map(K key, OrcStruct val,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
System.out.println(val);
List<? extends StructField> fields =oip.getAllStructFieldRefs();
StringObjectInspector bInspector =
(StringObjectInspector) fields.get(B_ID).getFieldObjectInspector();
String b = "garbage";
try {
b = bInspector.getPrimitiveJavaObject(oip.getStructFieldData(serde.deserialize(val), fields.get(B_ID)));
} catch (SerDeException e1) {
e1.printStackTrace();
}
OrcStruct struct = null;
try {
struct = (OrcStruct) oip.getStructFieldData(serde.deserialize(val),fields.get(C_ID));
} catch (SerDeException e1) {
e1.printStackTrace();
}
StructObjectInspector cInspector = (StructObjectInspector) fields.get(C_ID).getFieldObjectInspector();
int d = ((IntWritable) cInspector.getStructFieldData(struct, fields.get(D_ID))).get();
String e = cInspector.getStructFieldData(struct, fields.get(E_ID)).toString();
output.collect(new Text(b), new IntWritable(1));
output.collect(new Text(e), new IntWritable(1));
}
}
启动代码
JobConf job = new JobConf(new Configuration(), OrcReader.class);
// Specify various job-specific parameters
job.setJobName("myjob");
job.set("mapreduce.framework.name","local");
job.set("fs.default.name","file:///");
job.set("log4j.logger.org.apache.hadoop","INFO");
job.set("log4j.logger.org.apache.hadoop","INFO");
//push down projection columns
job.set("hive.io.file.readcolumn.ids","1,2");
job.set("hive.io.file.read.all.columns","false");
job.set("hive.io.file.readcolumn.names","b,c");
FileInputFormat.setInputPaths(job, new Path("./src/main/resources/000000_0.orc"));
FileOutputFormat.setOutputPath(job, new Path("./target/out1"));
job.setMapperClass(OrcReader.MyMapper.class);
job.setCombinerClass(OrcReader.MyReducer.class);
job.setReducerClass(OrcReader.MyReducer.class);
job.setInputFormat(OrcInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
JobClient.runJob(job);