Hadoop 2.4.0 + HCatalog + Mapreduce



在Hadoop 2.4.0中,我在执行下面的代码示例时得到以下错误。我认为,有不匹配的hadoop版本。你检查代码了吗?我该如何修正这些代码?

我正在尝试写map-reduce作业,复制Hcatalog表。

谢谢。

Exception in thread "main" java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected
at org.apache.hcatalog.mapreduce.HCatBaseOutputFormat.getJobInfo(HCatBaseOutputFormat.java:94)
at org.apache.hcatalog.mapreduce.HCatBaseOutputFormat.getOutputFormat(HCatBaseOutputFormat.java:82)
at org.apache.hcatalog.mapreduce.HCatBaseOutputFormat.checkOutputSpecs(HCatBaseOutputFormat.java:72)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
at org.deneme.hadoop.UseHCat.run(UseHCat.java:102)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at org.deneme.hadoop.UseHCat.main(UseHCat.java:107)

代码示例

public class UseHCat extends Configured implements Tool{
public static class Map extends Mapper<WritableComparable, HCatRecord,Text,IntWritable>     {
    String groupname;
    @Override
  protected void map( WritableComparable key,
                      HCatRecord value,
                      org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord,
                      Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        // The group table from /etc/group has name, 'x', id
        groupname = (String) value.get(0);
        int id = (Integer) value.get(2);
        // Just select and emit the name and ID
        context.write(new Text(groupname), new IntWritable(id));
    }
}
public static class Reduce extends Reducer<Text, IntWritable,
                                   WritableComparable, HCatRecord> {
    protected void reduce( Text key,
                           java.lang.Iterable<IntWritable> values,
                           org.apache.hadoop.mapreduce.Reducer<Text, IntWritable,
                           WritableComparable, HCatRecord>.Context context)
        throws IOException, InterruptedException {
        // Only expecting one ID per group name
        Iterator<IntWritable> iter = values.iterator();
        IntWritable iw = iter.next();
        int id = iw.get();
        // Emit the group name and ID as a record
        HCatRecord record = new DefaultHCatRecord(2);
        record.set(0, key.toString());
        record.set(1, id);
        context.write(null, record);
    }
}
public int run(String[] args) throws Exception {
    Configuration conf = getConf(); //hdfs://sandbox.hortonworks.com:8020
    //conf.set("fs.defaultFS", "hdfs://192.168.1.198:8020");
    //conf.set("mapreduce.job.tracker", "192.168.1.115:50001");
    //Configuration conf = new Configuration();
    //conf.set("fs.defaultFS", "hdfs://192.168.1.198:8020/data");
    args = new GenericOptionsParser(conf, args).getRemainingArgs();
    // Get the input and output table names as arguments
    String inputTableName = args[0];
    String outputTableName = args[1];
    // Assume the default database
    String dbName = null;

    String jobName = "UseHCat";
    String userChosenName = getConf().get(JobContext.JOB_NAME);
    if (userChosenName != null)
      jobName += ": " + userChosenName;
    Job job = Job.getInstance(getConf());
    job.setJobName(jobName);
//        Job job = new Job(conf, "UseHCat");
//        HCatInputFormat.setInput(job, InputJobInfo.create(dbName,inputTableName, null));
    HCatInputFormat.setInput(job, dbName, inputTableName);
    job.setJarByClass(UseHCat.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    // An HCatalog record as input
    job.setInputFormatClass(HCatInputFormat.class);
    // Mapper emits a string as key and an integer as value
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    // Ignore the key for the reducer output; emitting an HCatalog record as value
    job.setOutputKeyClass(WritableComparable.class);
    job.setOutputValueClass(DefaultHCatRecord.class);
    job.setOutputFormatClass(HCatOutputFormat.class);
    HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
    HCatSchema s = HCatOutputFormat.getTableSchema(job.getConfiguration());
    System.err.println("INFO: output schema explicitly set for writing:" + s);
    HCatOutputFormat.setSchema(job, s);
    return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
//      System.setProperty("hadoop.home.dir", "C:"+File.separator+"hadoop-2.4.0");
    int exitCode = ToolRunner.run(new UseHCat(), args);
    System.exit(exitCode);
}

}

在Hadoop 2.x中,JobContext是一个类。它是一个接口,hcatalog核心api与hadoop 2.x.x不兼容。

HCatalogBaseOutputFormat类需要修改以下代码来解决这个问题:

//JobContext ctx = new JobContext(conf,jobContext.getJobID());
JobContext ctx = new Job(conf);

相关内容

  • 没有找到相关文章

最新更新