我正在尝试用Java运行一个MapReduce程序,其中包含2个输入文件和2个映射器。
下面是代码:
public class CounterMapper {
public static class MyMap1 extends
Mapper<LongWritable, Text, Text, LongWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split("t");
int age = Integer.parseInt(line[26]);
context.write(new Text(line[7]), new LongWritable(age));
}
}
public static class MyMap2 extends Mapper {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split("t");
int age = Integer.parseInt(line[26]);
context.write(new Text(line[7]), new LongWritable(age));
}
}
public static class MyRed extends
Reducer<Text, LongWritable, Text, LongWritable> {
String line = null;
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
for (LongWritable value : values) {
line = value.toString();
}
context.write(key, new LongWritable());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "ProjectQuestion2");
job.setJarByClass(CounterMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setNumReduceTasks(1);
job.setMapperClass(MyMap1.class);
job.setMapperClass(MyMap2.class);
job.setReducerClass(MyRed.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, MyMap1.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, MyMap2.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行作业后,我收到以下错误:
信息地图减少。作业:任务 ID : attempt_1486434709675_0016_m_000000_2, 状态 : 失败 错误: java.io.IOException: 密钥中的类型不匹配来自 map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073) 在 org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715) 在 org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) 在 org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapreduce.lib.input.DelegatingMapper.run(DelegatingMapper.java:55) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
任何意见都值得赞赏...谢谢。
很可能在基Mapper
类中使用map()
方法而不是你的方法。由于这是一个身份映射器(直通),它将与您看到的错误相匹配。
我会做几件事:
- 在
MyMap2
将Mapper
更改为Mapper<LongWritable, Text, Text, LongWritable>
。 - 通过向
map()
方法添加@Override
批注,确保方法重写基Mapper
类。
您还可以(改进):
- 将
Job job = new Job(conf, "ProjectQuestion2");
更改为Job job = Job.getInstance(conf, "ProjectQuestion2");
以删除弃用警告。 job.setMapOutputKeyClass()
和job.setMapOutputValueClass()
设置两次,您可以删除一对。