Mapreduce with 2 keys



我只是在学习map reduce作业。我对我的作业做了一件事,我必须更改我的代码以接受另一个文本文件作为输入,输出必须显示带有最大值、最小值和平均值的年份位置。这是我输入的一行的示例:Calgary,AB,2009-01-07,604680,12694,2.5207754,0.065721168,0.025668362,0.972051954,0.037000279,0.022319018,,,0.003641149,,,0.002936745,,,0.016723641

并且输出应该类似于:Calgary 2009 Average is: Max: Min:

这是我的代码,它给出了txt文件并计算平均值、最小值和最大值:

public class AverageMinMax {

public static class Map extends Mapper<LongWritable,Date,Text,Text> {

    //private static final FloatWritable rep= new  FloatWritable(1);
        public void map(LongWritable key,Text value,Context context)
        throws IOException, InterruptedException {
                context.write(new Text("Map_Output"), value);
        };
    }
      public static class Combiner extends Reducer<Text,Text,Text,Text>
      {
      public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException
          {
             Integer NumberOfValues=0;
             double sum=0D;
             double min=0D;
             double max=0D;
             //double min=values.get(0);
              Iterator<Text> itr = values.iterator();
              //convertString=values(0);
              while(itr.hasNext())
              {
                  String TexttoString = itr.next().toString();
                  Double value = Double.parseDouble(TexttoString);
                  if(value<min)
                  {
                      min=value;
                  }
                  if(value>max)
                  {
                      max=value;
                  }
                  NumberOfValues++;
                  sum+=value;
              }
               Double average = sum/NumberOfValues;
                context.write(new Text("Combiner_output"), new Text(average + "," + NumberOfValues+","+min+","+max));
          };
      }
 public static class Reduce extends
       Reducer<Text,Text,Text,Text> {
      public void reduce(Text key, Iterable<Text> values,
        Context context) throws IOException, InterruptedException {
           Integer totalNumberOfValues= 0;
          Double sum=0.00;
          Double min=0D;
          Double max=0D;
          Iterator<Text> itr = values.iterator();
            while(itr.hasNext())
          {
              String TexttoString = itr.next().toString();
              String[] split_String = TexttoString.split(",");
              Double average = Double.parseDouble(split_String[0]);
              Integer NumberOfValues = Integer.parseInt(split_String[1]);
              Double minValue=Double.parseDouble(split_String[2]);
              Double maxValue=Double.parseDouble(split_String[3]);
              if(minValue<min)
              {
                  min=minValue;
              }
              if(maxValue>max)
              {
                  max=maxValue;
              }
              sum+=(average*NumberOfValues);
              totalNumberOfValues+=NumberOfValues;   
          } 
          Double average= sum/totalNumberOfValues;
          context.write(new Text("Average and Minimum and Max is"), new Text(average.toString()+" and "+ min.toString()+" and "+ max.toString()));
          };
     }
     public static void main(String[] args) throws Exception {
         Configuration conf = new Configuration();
         Job job=new Job(conf,"AverageMinMax.class");
         job.setJarByClass(AverageMinMax.class);
         job.setJobName("MapReduceAssignment");
         //JobConf conf = new JobConf(Hadoop_map_reduce.class);
        //conf.setJobName("Hadoop_assignment");
         // Configuration conf = new Configuration();
      //Job job = new Job(conf, "maxmin");
      //job.setJarByClass(Hadoop_map_reduce.class);
     // FileSystem fs = FileSystem.get(conf);
    /*  if (fs.exists(new Path(args[1]))) {
       fs.delete(new Path(args[1]), true);
      }*/
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(Text.class);
         //job.setNumReduceTasks(1);
         job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
         job.setCombinerClass(Combiner.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
     FileInputFormat.addInputPath(job, new Path(args[0]));
    //  FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //FileInputFormat.addInputPath(job, new Path("/home/cloudera/Desktop/assign2"));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
      //  FileOutputFormat.setOutputPath(job, new Path(" user/cloudera/output"));
      job.waitForCompletion(true);
     }

}

所以,我的第一个问题是,我不知道如何在映射器中转换日期,以及如何找到2个键并在输出中显示。我的意思是如何重写这个代码!

我感谢你的帮助

您的问题并不完全清楚。因此,我的假设如下:

  1. 您有一组数据,其中显示了要处理的位置、日期和一些双值
  2. 要处理的值从第一个双值开始(即2.5207754,…)
  3. 您的平均值是每年整个观测的所有列的平均值。(即,如果您有5个2009年的样本,并且每个样本有5个值,则需要25个值的平均值)
  4. 您的最小值和最大值是各个年份的整个观测值的最小值或最大值

如果假设是正确的,我建议你使用林书豪教授的自定义数据类型。一个可能的解决方案如下:

  1. 您的密钥将是位置和年份组合到文本中。

    String line = value.toString();
    String[] tokens = line.split(",");
    String[] date = tokens[2].split("-");
    String year = date[0];
    String location = tokens[0];
    Text locationYear = new Text(location + " " + year);
    
  2. 然后,您的值将是ArrayListOfDoublesWritable,您可以从我上面提到的回购中使用它。

    ArrayListOfDoublesWritable readings = new ArrayListOfDoublesWritable()
    for(int i = 5; i < tokens.length(); i++)
    {
      readings.add(Double.parseDouble(tokens[i]));
    }
    
  3. 然后,您可以将映射器输出作为Text和ArrayListOfDoublesWritable发出。

    context.write(locationYear, readings);
    

从这里,您可以通过使用数组列表的Collections方法,通过计算(平均值、最小值、最大值)来操纵减速器中的映射器输出。

我希望这能有所帮助。

好吧,您似乎有多个问题。立刻想到的两个:

  • 您的映射器输出密钥是'Combiner_Output'。这行不通。你想要这把钥匙是什么,可能是城市名称。在你的例子中,"卡尔加里"。这很容易使用value.toString().split(',')[0](即,从在,字符上分割value之后形成的列表中获得第一个元素)
  • 您的reducer代码根本没有输出城市名称。这可以通过在Reducer中执行context.write(new Text(key.toString() + " Average and Minimum and Max is"), new Text(average.toString()+" and "+ min.toString()+" and "+ max.toString()));来解决,其中key是来自上面点的城市名称

关于如何从字符串中提取日期,在Java中,请查看以下SO文章:从字符串中提取日期

一般来说,我建议您从Mapreduce是什么开始,它的设计权衡,以及如何在Hadoop架构的范围内充分利用它。

相关内容

  • 没有找到相关文章

最新更新