我正在使用Avro编写MapR,并且是针对Avro的真正初学者。输入和输出都是具有特定架构的 avro 格式。
这是我使用 MR1 的 mapReduce API 的映射器和化简器:
public class UserClassifyMapReduce extends Configured implements Tool {
private final static Logger logger = LoggerFactory.getLogger(UserClassifyMapReduce.class);
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new UserClassifyMapReduce(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
logger.error("Usage: UserClassify <intputfile> <outputfolder>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = new Job(getConf());
job.setJobName("UserClassify");
AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());
FileInputFormat.setInputPaths(job, new Path(args[0]));
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);
job.setJarByClass(DataSerializeMapReduce.class);
job.setMapperClass(MyAvroMap.class);
job.setReducerClass(MyAvroReduce.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
AvroKey<CharSequence>, AvroValue<NetflowRecord>>{
@Override
protected void map(AvroKey<NetflowRecord> key, NullWritable value, Context context)
throws IOException, InterruptedException{
CharSequence devMac = key.datum().getDevMacAddr();
context.write(new AvroKey<CharSequence>(devMac), new AvroValue<NetflowRecord>(key.datum()));
}
}
public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>,
AvroKey<NetflowRecord>, NullWritable>{
@Override
protected void reduce(AvroKey<CharSequence> key, Iterable<AvroValue<NetflowRecord>> values, Context context)
throws IOException, InterruptedException{
(...code)
}
}
}
CastError 会抛出类似
java.lang.Exception: java.lang.ClassCastException: class org.apache.avro.mapred.AvroKey
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.ClassCastException: class org.apache.avro.mapred.AvroKey
at java.lang.Class.asSubclass(Class.java:3116)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:964)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:673)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
一个非常简单的程序。你对这个问题有任何想法吗?多谢。
杰明
您似乎缺少映射器输出键AvroKey<CharSequence>
的架构。添加相应的架构就足够了:
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));