"ClassCastExceptionclass org.apache.avro.mapred.AvroKey"是怎么回事?



我正在使用Avro编写MapR,并且是针对Avro的真正初学者。输入和输出都是具有特定架构的 avro 格式。

这是我使用 MR1 的 mapReduce API 的映射器和化简器:

public class UserClassifyMapReduce extends Configured implements Tool {
private final static Logger logger = LoggerFactory.getLogger(UserClassifyMapReduce.class);
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new UserClassifyMapReduce(), args);
        System.exit(res);
    }
    @Override
    public int run(String[] args) throws Exception {
        if (args.length < 2) {
            logger.error("Usage: UserClassify <intputfile> <outputfolder>");
            System.exit(-1);
        }
        Configuration conf = new Configuration();
        Job job = new Job(getConf());
        job.setJobName("UserClassify");
        AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
        AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);
        outPath.getFileSystem(conf).delete(outPath, true);
        job.setJarByClass(DataSerializeMapReduce.class);
        job.setMapperClass(MyAvroMap.class);
        job.setReducerClass(MyAvroReduce.class);
        job.setInputFormatClass(AvroKeyInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        job.setMapOutputKeyClass(AvroKey.class);
        job.setMapOutputValueClass(AvroValue.class);
        job.setOutputKeyClass(AvroKey.class);
        job.setOutputValueClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
                                        AvroKey<CharSequence>, AvroValue<NetflowRecord>>{
        @Override
        protected void map(AvroKey<NetflowRecord> key, NullWritable value, Context context) 
                                        throws IOException, InterruptedException{
            CharSequence devMac = key.datum().getDevMacAddr();
            context.write(new AvroKey<CharSequence>(devMac), new AvroValue<NetflowRecord>(key.datum()));
        }
    }
    public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>, 
                                        AvroKey<NetflowRecord>, NullWritable>{
        @Override
        protected void reduce(AvroKey<CharSequence> key, Iterable<AvroValue<NetflowRecord>> values, Context context) 
                                        throws IOException, InterruptedException{
            (...code)
        }
    }
}

CastError 会抛出类似

    java.lang.Exception: java.lang.ClassCastException: class org.apache.avro.mapred.AvroKey
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.ClassCastException: class org.apache.avro.mapred.AvroKey
    at java.lang.Class.asSubclass(Class.java:3116)
    at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:964)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:673)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)

一个非常简单的程序。你对这个问题有任何想法吗?多谢。

杰明

您似乎缺少映射器输出键AvroKey<CharSequence>的架构。添加相应的架构就足够了:

AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));

相关内容

  • 没有找到相关文章

最新更新