在Hadoop 2.4.0中,我在执行下面的代码示例时得到以下错误。我认为,有不匹配的hadoop版本。你检查代码了吗?我该如何修正这些代码?
我正在尝试写map-reduce作业,复制Hcatalog表。
谢谢。
Exception in thread "main" java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected
at org.apache.hcatalog.mapreduce.HCatBaseOutputFormat.getJobInfo(HCatBaseOutputFormat.java:94)
at org.apache.hcatalog.mapreduce.HCatBaseOutputFormat.getOutputFormat(HCatBaseOutputFormat.java:82)
at org.apache.hcatalog.mapreduce.HCatBaseOutputFormat.checkOutputSpecs(HCatBaseOutputFormat.java:72)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
at org.deneme.hadoop.UseHCat.run(UseHCat.java:102)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at org.deneme.hadoop.UseHCat.main(UseHCat.java:107)
代码示例
public class UseHCat extends Configured implements Tool{
public static class Map extends Mapper<WritableComparable, HCatRecord,Text,IntWritable> {
String groupname;
@Override
protected void map( WritableComparable key,
HCatRecord value,
org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord,
Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// The group table from /etc/group has name, 'x', id
groupname = (String) value.get(0);
int id = (Integer) value.get(2);
// Just select and emit the name and ID
context.write(new Text(groupname), new IntWritable(id));
}
}
public static class Reduce extends Reducer<Text, IntWritable,
WritableComparable, HCatRecord> {
protected void reduce( Text key,
java.lang.Iterable<IntWritable> values,
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable,
WritableComparable, HCatRecord>.Context context)
throws IOException, InterruptedException {
// Only expecting one ID per group name
Iterator<IntWritable> iter = values.iterator();
IntWritable iw = iter.next();
int id = iw.get();
// Emit the group name and ID as a record
HCatRecord record = new DefaultHCatRecord(2);
record.set(0, key.toString());
record.set(1, id);
context.write(null, record);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf(); //hdfs://sandbox.hortonworks.com:8020
//conf.set("fs.defaultFS", "hdfs://192.168.1.198:8020");
//conf.set("mapreduce.job.tracker", "192.168.1.115:50001");
//Configuration conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://192.168.1.198:8020/data");
args = new GenericOptionsParser(conf, args).getRemainingArgs();
// Get the input and output table names as arguments
String inputTableName = args[0];
String outputTableName = args[1];
// Assume the default database
String dbName = null;
String jobName = "UseHCat";
String userChosenName = getConf().get(JobContext.JOB_NAME);
if (userChosenName != null)
jobName += ": " + userChosenName;
Job job = Job.getInstance(getConf());
job.setJobName(jobName);
// Job job = new Job(conf, "UseHCat");
// HCatInputFormat.setInput(job, InputJobInfo.create(dbName,inputTableName, null));
HCatInputFormat.setInput(job, dbName, inputTableName);
job.setJarByClass(UseHCat.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// An HCatalog record as input
job.setInputFormatClass(HCatInputFormat.class);
// Mapper emits a string as key and an integer as value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// Ignore the key for the reducer output; emitting an HCatalog record as value
job.setOutputKeyClass(WritableComparable.class);
job.setOutputValueClass(DefaultHCatRecord.class);
job.setOutputFormatClass(HCatOutputFormat.class);
HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
HCatSchema s = HCatOutputFormat.getTableSchema(job.getConfiguration());
System.err.println("INFO: output schema explicitly set for writing:" + s);
HCatOutputFormat.setSchema(job, s);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
// System.setProperty("hadoop.home.dir", "C:"+File.separator+"hadoop-2.4.0");
int exitCode = ToolRunner.run(new UseHCat(), args);
System.exit(exitCode);
}
}
在Hadoop 2.x中,JobContext是一个类。它是一个接口,hcatalog核心api与hadoop 2.x.x不兼容。
HCatalogBaseOutputFormat类需要修改以下代码来解决这个问题:
//JobContext ctx = new JobContext(conf,jobContext.getJobID());
JobContext ctx = new Job(conf);