我正试图从Mapreduce作业中获得输出,我们可以从Hive查询中轻松获得该输出。我有一个数据集:
ID、颜色、费率
1,蓝色,2002,绿色,1703,黄色,4004,黄色,2305,绿色,1406,红色,5427,绿色,438,蓝色,2289,红色,190
现在我只想要输出速率超过200的那些行。因此,Hive中的查询将非常直接:
从table_name中选择id、color、rate,其中rate>200
现在,我正在尝试MapReduce代码,其中我认为只有Mapper就足以获得所需的输出。但无法计算Mapper输出。。
HiveMapper
public class HiveMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String [] line = value.toString().split(",");
if(Integer.parseInt(line[2]) > 200) {
System.out.println(line[0] + "," + line[1] + "," + line[2]);
context.write(new Text(line[0] + "," + line[1] + ",") , new Text(line[2]) );
//context
}
}
}
HiveDriver
public class HiveDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = new Job(conf,"Mapreduce in Hive");
job.setJarByClass(HiveDriver.class);
job.setMapperClass(HiveMapper.class);
job.setMapOutputKeyClass(TextInputFormat.class);
job.setMapOutputValueClass(TextInputFormat.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
//int ss = job.waitForCompletion(true) ? 0 : 1;
//System.out.println(ss);
}
}
电流输出
0 1,blue,200
12 2,green,170
25 3,yellow, 400
40 4,yellow, 230
55 5,green, 140
69 6,red, 542
81 7,green, 43
94 8,blue, 228
107 9,red,190
预期输出
3,黄色,4004,黄色,2306,红色,5428,蓝色,228
map Task没有使用您的map函数。相反,它使用映射函数的默认实现。之所以会发生这种情况,是因为Class声明和函数参数中的键类型不同。在类声明中,有K->LongWritable,但在函数中,有键文本。
由于类型不相同,映射函数实现不会覆盖默认实现。
您的地图功能的正确签名应该是:
public void map(LongWritable key, Text value, Context context)