假设我有一个如下所示的输入文件
dept_id emp_id salary
1 13611 1234
2 13609 3245
3 13612 3251
2 13623 1232
1 13619 6574
3 13421 234
现在我想找出每个部门的平均工资。比如下面的Hive查询:
SELECT dept_id, avg(salary) FROM dept GROUP BY dept_id
这会返回输出:
dept_id avg_sal
----------------
1 3904.0
2 2238.5
3 1742.5
现在,我想做的是生成相同的输出,但是使用mapreduce框架。那么怎么写呢?提前感谢!
重要:在尝试实现它之前,首先尝试MapReduce中的一些基本示例,例如实现单词计数程序,以了解逻辑,甚至在此之前,阅读有关MapReduce如何工作的书籍或教程。
聚合东西(比如找到平均值)的想法是在map阶段按键(部门id)分组,然后在reduce阶段减少特定部门的所有工资。
用更正式的方式:
地图:
输入:一行表示工资记录(即,dep_id, emp_id, salary)
输出(键,值):(dep_id, salary)
减少:
input (key, values): (dep_id, salaries:包含此dep_id的薪资值列表)
输出(键,值):(dep_id, avg(salary))
这样,所有属于同一部门的工资将由同一个减速器处理。在这个减速器中,你所要做的就是找到输入值的平均值。
code----
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AverageSalary {
public static class AvgMapper
extends Mapper<Object, Text, Text, FloatWritable>{
private Text dept_id = new Text();
private FloatWritable salary = new FloatWritable();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String values[] = value.toString().split("t");
dept_id.set(values[0]);
salary.set(Float.parseFloat(values[2]));
context.write(dept_id, salary);
}
}
public static class AvgReducer
extends Reducer<Text,FloatWritable,Text,FloatWritable> {
private FloatWritable result = new FloatWritable();
public void reduce(Text key, Iterable<FloatWritable> values,
Context context
) throws IOException, InterruptedException {
float sum = 0;
float count = 0;
for (FloatWritable val : values) {
sum += val.get();
count++;
}
result.set(sum/count);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "average salary");
job.setJarByClass(AverageSalary.class);
job.setMapperClass(AvgMapper.class);
job.setCombinerClass(AvgReducer.class);
job.setReducerClass(AvgReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path("/home/kishore/Data/mapreduce.txt")); // input path
FileOutputFormat.setOutputPath(job, new Path("/home/kishore/Data/map3")); // output path
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
output
1 3904.0
2 2238.5
3 1742.5
如果你还没有接受过任何培训计划,请访问你的tube上由edureka提供的免费视频,以更好地理解概念:
映射器
Mapper将输入键/值对映射到一组中间键/值对。
映射是将输入记录转换为中间记录的单个任务。转换后的中间记录不需要与输入记录具有相同的类型。一个给定的输入对可以映射到零个或多个输出对。
齿轮Reducer将一组共享键的中间值简化为一个更小的值集。
作业的reduce个数由用户通过job . setnumreducetasks (int)设置。
在Apache Hadoop网站上的工作示例:Word Count示例
对于您的用例,简单使用单词计数示例是不够的。您必须在Mapper上使用组合器和分区器,因为您正在使用Group by。访问此视频:Advanced Map reduce