我创建了一个名为DataObject的自定义类。我想在Map函数中填充这个对象的值然后把这个对象发送给reduce函数。以下是我的代码。但是,我得到以下错误。
java.lang.Exception: java.lang.ClassCastException: class DataObject
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.ClassCastException: class DataObject
at java.lang.Class.asSubclass(Class.java:3208)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:964)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:422)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/07 17:47:57 INFO mapred.JobClient: map 0% reduce 0%
15/10/07 17:47:57 INFO mapred.JobClient: Job complete: job_local582994215_0001
15/10/07 17:47:57 INFO mapred.JobClient: Counters: 0
15/10/07 17:47:57 INFO mapred.JobClient: Job Failed: NA
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1357)
at WordCount.main(WordCount.java:103)
Following is my program-
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, DataObject, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable arg0, Text value, OutputCollector<DataObject, IntWritable> output,
Reporter reporter) throws IOException {
// TODO Auto-generated method stub
FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
DataObject dObject = new DataObject();
dObject.setFileName(fileName);
dObject.setWord(value);
word.set(tokenizer.nextToken());
output.collect(dObject, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<DataObject, IntWritable, Text, IntWritable> {
@Override
public void reduce(DataObject dObject, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
// TODO Auto-generated method stub
int sum = 0;
String[] inputValue = new String[] { "if", "the" };
if (Arrays.asList(inputValue).contains(dObject.getWord().toString())) {
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(dObject.getWord(), new IntWritable(sum));
}
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(DataObject.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
如果你的映射器与你的reducer发出不同的键和值,那么我们必须在Job配置中分别指定它们。
//减速机的配置
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
//映射器的配置
conf.setMapOutputKeyClass(DataObject.class);
conf.setMapOutputValueClass(IntWritable.class);
在代码中同时包含....