Hadoop Total Order Partitioner


import java.io.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.mapreduce.lib.partition.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.util.*;
/**
 * Demonstrates how to use Total Order Partitioner on Word Count.
 */
public class TotalOrderPartitionerExample {
public static class WordCount extends Configured implements Tool {
private final static int REDUCE_TASKS = 8;
    public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new WordCount(), args);
      System.exit(exitCode);
    }
    @Override @SuppressWarnings({ "unchecked", "rawtypes" })
    public int run(String[] args) throws Exception {
      // Check arguments.
      if (args.length != 2) {
        String usage =
          "Usage: " +
          "hadoop jar TotalOrderPartitionerExample$WordCount " +
          "<input dir> <output dir>n"
        System.out.printf(usage);
        System.exit(-1);
      }
      String jobName = "WordCount";
      String mapJobName = jobName + "-Map";
      String reduceJobName = jobName + "-Reduce";
      // Get user args.
      String inputDir = args[0];
      String outputDir = args[1];
      // Define input path and output path.
      Path mapInputPath = new Path(inputDir);
      Path mapOutputPath = new Path(outputDir + "-inter");
      Path reduceOutputPath = new Path(outputDir);
      // Define partition file path.
      Path partitionPath = new Path(outputDir + "-part.lst");
      // Configure map-only job for sampling.
      Job mapJob = new Job(getConf());
      mapJob.setJobName(mapJobName);
      mapJob.setJarByClass(WordCount.class);
      mapJob.setMapperClass(WordMapper.class);
      mapJob.setNumReduceTasks(0);
      mapJob.setOutputKeyClass(Text.class);
      mapJob.setOutputValueClass(IntWritable.class);
      TextInputFormat.setInputPaths(mapJob, mapInputPath);
      // Set the output format to a sequence file.
      mapJob.setOutputFormatClass(SequenceFileOutputFormat.class);
      SequenceFileOutputFormat.setOutputPath(mapJob, mapOutputPath);
      // Submit the map-only job.
      int exitCode = mapJob.waitForCompletion(true) ? 0 : 1;
      if (exitCode != 0) { return exitCode; }
      // Set up the second job, the reduce-only.
      Job reduceJob = new Job(getConf());
      reduceJob.setJobName(reduceJobName);
      reduceJob.setJarByClass(WordCount.class);
      // Set the input to the previous job's output.
      reduceJob.setInputFormatClass(SequenceFileInputFormat.class);
      SequenceFileInputFormat.setInputPaths(reduceJob, mapOutputPath);
      // Set the output path to the final output path.
      TextOutputFormat.setOutputPath(reduceJob, reduceOutputPath);
      // Use identity mapper for key/value pairs in SequenceFile.
      reduceJob.setReducerClass(IntSumReducer.class);
      reduceJob.setMapOutputKeyClass(Text.class);
      reduceJob.setMapOutputValueClass(IntWritable.class);
      reduceJob.setOutputKeyClass(Text.class);
      reduceJob.setOutputValueClass(IntWritable.class);
      reduceJob.setNumReduceTasks(REDUCE_TASKS);
      // Use Total Order Partitioner.
      reduceJob.setPartitionerClass(TotalOrderPartitioner.class);
      // Generate partition file from map-only job's output.
      TotalOrderPartitioner.setPartitionFile(
          reduceJob.getConfiguration(), partitionPath);
      InputSampler.writePartitionFile(reduceJob, new InputSampler.RandomSampler(
          1, 10000));
      // Submit the reduce job.
      return reduceJob.waitForCompletion(true) ? 0 : 2;
    }
  }
  public static class WordMapper extends
      Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String line = value.toString();
      for (String word : line.split("\W+")) {
        if (word.length() == 0) { continue; }
        context.write(new Text(word), new IntWritable(1));
      }
    }
  }
}

我从github得到了这段代码。我比较了地图和减少的经过时间。常规字数比总字数统计效果更好。为什么?需要进行任何优化或更改才能满足平均性能? 哈希分区程序性能与总订单分区程序性能?

是的,HashPartitioner 的性能会比 TotalOrderPartitioner 更好,因为 HashPartitioner 没有开销或运行 InputSampler 并写入分区文件等,

TotalOrderPartitioner仅在需要全局排序输出时使用,并且比HashPartitioner慢。

相关内容

  • 没有找到相关文章

最新更新