我只是在学习map reduce作业。我对我的作业做了一件事,我必须更改我的代码以接受另一个文本文件作为输入,输出必须显示带有最大值、最小值和平均值的年份位置。这是我输入的一行的示例:Calgary,AB,2009-01-07,604680,12694,2.5207754,0.065721168,0.025668362,0.972051954,0.037000279,0.022319018,,,0.003641149,,,0.002936745,,,0.016723641
并且输出应该类似于:Calgary 2009 Average is: Max: Min:
这是我的代码,它给出了txt文件并计算平均值、最小值和最大值:
public class AverageMinMax {
public static class Map extends Mapper<LongWritable,Date,Text,Text> {
//private static final FloatWritable rep= new FloatWritable(1);
public void map(LongWritable key,Text value,Context context)
throws IOException, InterruptedException {
context.write(new Text("Map_Output"), value);
};
}
public static class Combiner extends Reducer<Text,Text,Text,Text>
{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException
{
Integer NumberOfValues=0;
double sum=0D;
double min=0D;
double max=0D;
//double min=values.get(0);
Iterator<Text> itr = values.iterator();
//convertString=values(0);
while(itr.hasNext())
{
String TexttoString = itr.next().toString();
Double value = Double.parseDouble(TexttoString);
if(value<min)
{
min=value;
}
if(value>max)
{
max=value;
}
NumberOfValues++;
sum+=value;
}
Double average = sum/NumberOfValues;
context.write(new Text("Combiner_output"), new Text(average + "," + NumberOfValues+","+min+","+max));
};
}
public static class Reduce extends
Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
Integer totalNumberOfValues= 0;
Double sum=0.00;
Double min=0D;
Double max=0D;
Iterator<Text> itr = values.iterator();
while(itr.hasNext())
{
String TexttoString = itr.next().toString();
String[] split_String = TexttoString.split(",");
Double average = Double.parseDouble(split_String[0]);
Integer NumberOfValues = Integer.parseInt(split_String[1]);
Double minValue=Double.parseDouble(split_String[2]);
Double maxValue=Double.parseDouble(split_String[3]);
if(minValue<min)
{
min=minValue;
}
if(maxValue>max)
{
max=maxValue;
}
sum+=(average*NumberOfValues);
totalNumberOfValues+=NumberOfValues;
}
Double average= sum/totalNumberOfValues;
context.write(new Text("Average and Minimum and Max is"), new Text(average.toString()+" and "+ min.toString()+" and "+ max.toString()));
};
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job=new Job(conf,"AverageMinMax.class");
job.setJarByClass(AverageMinMax.class);
job.setJobName("MapReduceAssignment");
//JobConf conf = new JobConf(Hadoop_map_reduce.class);
//conf.setJobName("Hadoop_assignment");
// Configuration conf = new Configuration();
//Job job = new Job(conf, "maxmin");
//job.setJarByClass(Hadoop_map_reduce.class);
// FileSystem fs = FileSystem.get(conf);
/* if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]), true);
}*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setNumReduceTasks(1);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass(Combiner.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
//FileInputFormat.addInputPath(job, new Path("/home/cloudera/Desktop/assign2"));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// FileOutputFormat.setOutputPath(job, new Path(" user/cloudera/output"));
job.waitForCompletion(true);
}
}
所以,我的第一个问题是,我不知道如何在映射器中转换日期,以及如何找到2个键并在输出中显示。我的意思是如何重写这个代码!
我感谢你的帮助
您的问题并不完全清楚。因此,我的假设如下:
- 您有一组数据,其中显示了要处理的位置、日期和一些双值
- 要处理的值从第一个双值开始(即2.5207754,…)
- 您的平均值是每年整个观测的所有列的平均值。(即,如果您有5个2009年的样本,并且每个样本有5个值,则需要25个值的平均值)
- 您的最小值和最大值是各个年份的整个观测值的最小值或最大值
如果假设是正确的,我建议你使用林书豪教授的自定义数据类型。一个可能的解决方案如下:
-
您的密钥将是位置和年份组合到文本中。
String line = value.toString(); String[] tokens = line.split(","); String[] date = tokens[2].split("-"); String year = date[0]; String location = tokens[0]; Text locationYear = new Text(location + " " + year);
-
然后,您的值将是ArrayListOfDoublesWritable,您可以从我上面提到的回购中使用它。
ArrayListOfDoublesWritable readings = new ArrayListOfDoublesWritable() for(int i = 5; i < tokens.length(); i++) { readings.add(Double.parseDouble(tokens[i])); }
-
然后,您可以将映射器输出作为Text和ArrayListOfDoublesWritable发出。
context.write(locationYear, readings);
从这里,您可以通过使用数组列表的Collections方法,通过计算(平均值、最小值、最大值)来操纵减速器中的映射器输出。
我希望这能有所帮助。
好吧,您似乎有多个问题。立刻想到的两个:
- 您的映射器输出密钥是
'Combiner_Output'
。这行不通。你想要这把钥匙是什么,可能是城市名称。在你的例子中,"卡尔加里"。这很容易使用value.toString().split(',')[0]
(即,从在,
字符上分割value
之后形成的列表中获得第一个元素) - 您的reducer代码根本没有输出城市名称。这可以通过在Reducer中执行
context.write(new Text(key.toString() + " Average and Minimum and Max is"), new Text(average.toString()+" and "+ min.toString()+" and "+ max.toString()));
来解决,其中key
是来自上面点的城市名称
关于如何从字符串中提取日期,在Java中,请查看以下SO文章:从字符串中提取日期
一般来说,我建议您从Mapreduce是什么开始,它的设计权衡,以及如何在Hadoop架构的范围内充分利用它。