HadoopMapReduce生成的输出只包含集群上的最后一个条目



我的工作是统计推文中的标签词,然后显示最频繁的15个。

当从测试类对其进行本地测试时,它工作得很好,但在集群上,它只给出最后一个计算的条目。

映射器

public class HashtagMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String cleanValue = value.toString().replaceAll("[^A-Za-z0-9#]", " ");
    cleanValue = cleanValue.replace("# ", " ");
    StringTokenizer itr = new StringTokenizer(cleanValue, " ");
    while (itr.hasMoreTokens()) {
        word.set(itr.nextToken().trim());
        if (word.find("#", 0) == 0) {
            context.write(new Text(word), ONE);
        }
    }
}
}

减速器

public class HashtagReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {
private final HashMultimap<Text, Integer> hashtags = HashMultimap.create();
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
        sum += val.get();
    }
    hashtags.put(key, sum);
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
    Map<Text, Collection<Integer>> sortedMap = sortByComparator(hashtags.asMap());
    Set<Text> keys = sortedMap.keySet();
    int index = 0;
    for (Text key : keys) {
        context.write(new Text(key), new DoubleWritable(sortedMap.get(key).iterator().next()));
        index++;
        if (index == Math.min(keys.size() -1, 15)) {
            break;
        }
    }
}
private static Map<Text, Collection<Integer>> sortByComparator(Map<Text, Collection<Integer>> unsortedMap) {
    List<Map.Entry<Text, Collection<Integer>>> list = new LinkedList<Map.Entry<Text, Collection<Integer>>>(unsortedMap.entrySet());
    Collections.sort(list, Collections.reverseOrder(new Comparator<Map.Entry<Text, Collection<Integer>>>() {
        public int compare(Map.Entry<Text, Collection<Integer>> o1,
                           Map.Entry<Text, Collection<Integer>> o2) {
            return (o1.getValue()).iterator().next().compareTo(o2.getValue().iterator().next());
        }
    }));
    Map<Text, Collection<Integer>> sortedMap = new LinkedHashMap<Text, Collection<Integer>>();
    for (Map.Entry<Text, Collection<Integer>> entry : list) {
        sortedMap.put(entry.getKey(), entry.getValue());
    }
    return sortedMap;
}
}

驱动程序

public class StubDriver {
public static void main(String[] args) throws Exception {
/*
 * Validate that two arguments were passed from the command line.
 */
if (args.length != 2) {
  System.out.printf("Usage: StubDriver <input dir> <output dir>n");
  System.exit(-1);
}
/*
 * Instantiate a Job object for your job's configuration. 
 */
Job job = Job.getInstance();
/*
 * Specify the jar file that contains your driver, mapper, and reducer.
 * Hadoop will transfer this jar file to nodes in your cluster running 
 * mapper and reducer tasks.
 */
job.setJarByClass(StubDriver.class);
/*
 * Specify an easily-decipherable name for the job.
 * This job name will appear in reports and logs.
 */
job.setJobName("StubDriver");
job.setMapperClass(HashtagMapper.class);
job.setReducerClass(HashtagReducer.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
/*
 * Start the MapReduce job and wait for it to finish.
 * If it finishes successfully, return 0. If not, return 1.
 */
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
} 

测试

公共类HashtagTest{

MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable> mapReduceDriver;
@Before
public void setUp() {
    HashtagMapper hashtagMapper = new HashtagMapper();
    HashtagReducer hashtagReducer = new HashtagReducer();
    mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
    mapDriver.setMapper(hashtagMapper);
    mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable>();
    mapReduceDriver.setMapper(hashtagMapper);
    mapReduceDriver.setReducer(hashtagReducer);
}
@Test
public void testMapReducer_3() {
    try {
        File file = new File("/home/paul/Documents/Studium/Master Bussines Informatics/3 Semester/Business Intelligence/Part 2/tweets/small.json");
        FileInputStream fileInputStream = new FileInputStream(file);
        byte[] data = new byte[(int) file.length()];
        fileInputStream.read(data);
        fileInputStream.close();
        String str = new String(data, "UTF-8");
        mapReduceDriver
                .withInput(new LongWritable(), new Text(str))
                .withOutput(new Text("#Like"), new DoubleWritable((77)))
                .withOutput(new Text("#Retweet"), new DoubleWritable(75))
                .withOutput(new Text("#Malerei"), new DoubleWritable(35))
                .withOutput(new Text("#N"), new DoubleWritable(35))
                .withOutput(new Text("#nagelstudio"), new DoubleWritable(35))
                .withOutput(new Text("#nailart"),new DoubleWritable(35))
                .withOutput(new Text("#Crystalnails"),new DoubleWritable(35))
                .withOutput(new Text("#babyboomer"),new DoubleWritable(35))
                .withOutput(new Text("#Geln"),new DoubleWritable(35))
                .withOutput(new Text("#GelN"),new DoubleWritable(35))
                .withOutput(new Text("#Muster"),new DoubleWritable(35))
                .withOutput(new Text("#NagelstudioWien"),new DoubleWritable(35))
                .withOutput(new Text("#GoodVibeTribe"),new DoubleWritable(24))
                .withOutput(new Text("#1DPL"),new DoubleWritable(24))
                .withOutput(new Text("#Przesz"),new DoubleWritable(22))
                .runTest();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

有人有小费吗?

我通过在Reducer中更改以下内容来解决问题:

  • public void cleanup(上下文上下文)=>public void clenup(Reducer.Context上下文)
  • hashtag.put()=>this.hashtag.pot()
  • sortedMap=sortByComparator(hashtag.asMap())=>sortedMap=sortByComparator(this.hashtag.asMap())

相关内容

  • 没有找到相关文章

最新更新