如何为这种场景编写MapReduce代码



假设我有一个如下所示的输入文件

dept_id emp_id  salary
1       13611   1234
2       13609   3245
3       13612   3251
2       13623   1232
1       13619   6574
3       13421   234

现在我想找出每个部门的平均工资。比如下面的Hive查询:

SELECT dept_id, avg(salary) FROM dept GROUP BY dept_id

这会返回输出:

dept_id avg_sal
----------------
  1     3904.0
  2     2238.5
  3     1742.5

现在,我想做的是生成相同的输出,但是使用mapreduce框架。那么怎么写呢?提前感谢!

重要:在尝试实现它之前,首先尝试MapReduce中的一些基本示例,例如实现单词计数程序,以了解逻辑,甚至在此之前,阅读有关MapReduce如何工作的书籍或教程。

聚合东西(比如找到平均值)的想法是在map阶段按键(部门id)分组,然后在reduce阶段减少特定部门的所有工资。

用更正式的方式:

地图:

输入:一行表示工资记录(即,dep_id, emp_id, salary)
输出(键,值):(dep_id, salary)

减少:

input (key, values): (dep_id, salaries:包含此dep_id的薪资值列表)
输出(键,值):(dep_id, avg(salary))

这样,所有属于同一部门的工资将由同一个减速器处理。在这个减速器中,你所要做的就是找到输入值的平均值。

code----

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AverageSalary {
  public static class AvgMapper
       extends Mapper<Object, Text, Text, FloatWritable>{
    private Text dept_id = new Text();
    private FloatWritable salary = new FloatWritable(); 
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
        String values[] = value.toString().split("t");
        dept_id.set(values[0]);
        salary.set(Float.parseFloat(values[2]));
        context.write(dept_id, salary);
    }
  }
  public static class AvgReducer
       extends Reducer<Text,FloatWritable,Text,FloatWritable> {
    private FloatWritable result = new FloatWritable();
    public void reduce(Text key, Iterable<FloatWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      float sum = 0;
      float count = 0;
      for (FloatWritable val : values) {
        sum += val.get();
        count++;
      }
      result.set(sum/count);
      context.write(key, result);
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "average salary");
    job.setJarByClass(AverageSalary.class);
    job.setMapperClass(AvgMapper.class);
    job.setCombinerClass(AvgReducer.class);
    job.setReducerClass(AvgReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FloatWritable.class);
    FileInputFormat.addInputPath(job, new Path("/home/kishore/Data/mapreduce.txt"));  // input path
    FileOutputFormat.setOutputPath(job, new Path("/home/kishore/Data/map3")); // output path
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
output 
1   3904.0
2   2238.5
3   1742.5

如果你还没有接受过任何培训计划,请访问你的tube上由edureka提供的免费视频,以更好地理解概念:

映射器

Mapper将输入键/值对映射到一组中间键/值对。

映射是将输入记录转换为中间记录的单个任务。转换后的中间记录不需要与输入记录具有相同的类型。一个给定的输入对可以映射到零个或多个输出对。

齿轮

Reducer将一组共享键的中间值简化为一个更小的值集。

作业的reduce个数由用户通过job . setnumreducetasks (int)设置。

在Apache Hadoop网站上的工作示例:Word Count示例

对于您的用例,简单使用单词计数示例是不够的。您必须在Mapper上使用组合器和分区器,因为您正在使用Group by。访问此视频:Advanced Map reduce

相关内容

  • 没有找到相关文章