MapReduce - Reducer给出错误的输出日期



我编写了一个MapReduce程序来解析CSV中的值。

数据集如下-

40020年

PRAVEEN,宝贝,026 a2, 12/04/2015

40020年

PRAVEEN,玩具,0383年,1/04/2014

PRAVEEN, 2727272,书,03383年,03/14/2013

22636年

PRAVEEN,自行车,7373737,12/24/2012

我的Map函数是从CSV中读取第一个值(即UserName)作为KEY,最后一个值(即日期)作为value

我的Reduce函数也很简单,我必须从VALUE列表中选择最近的日期作为一个特定键的值,例如UserName

代码如下-

  package com.test.mapreduce;
  import java.io.IOException;
  import java.text.ParseException;
  import java.text.SimpleDateFormat;
  import java.util.ArrayList;
  import java.util.Date;
  import java.util.HashSet;
  import java.util.Iterator;
  import java.util.List;
  import java.util.Set;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.conf.Configured;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapred.FileInputFormat;
  import org.apache.hadoop.mapred.FileOutputFormat;
  import org.apache.hadoop.mapred.JobClient;
  import org.apache.hadoop.mapred.JobConf;
  import org.apache.hadoop.mapred.KeyValueTextInputFormat;
  import org.apache.hadoop.mapred.MapReduceBase;
  import org.apache.hadoop.mapred.Mapper;
  import org.apache.hadoop.mapred.OutputCollector;
  import org.apache.hadoop.mapred.Reducer;
  import org.apache.hadoop.mapred.Reporter;
  import org.apache.hadoop.mapred.TextInputFormat;
  import org.apache.hadoop.mapred.TextOutputFormat;
  import org.apache.hadoop.util.Tool;
  import org.apache.hadoop.util.ToolRunner;


 public class RetailCustomerAnalysis_2 extends Configured implements Tool {
             public static class MapClass extends MapReduceBase
             implements Mapper<LongWritable, Text, Text, Text> {
      private Text key1 = new Text();
      private Text value1 = new Text();
      private int noofFields = 5;

 public void map(LongWritable key, Text value,
                 OutputCollector<Text, Text> output,
                 Reporter reporter) throws IOException {
        String line = value.toString().replaceAll("\s+","");
        String[] split = line.split(",");

        if(split.length!=noofFields){
        return;
        }
        else {
            key1.set(split[0].toString().trim()); 
            value1.set(split[4].toString().trim());
            System.out.println(split[4].toString().trim());
            output.collect(key1, value1);
     }
    }
  }
 public static class Reduce extends MapReduceBase
 implements Reducer<Text, Text, Text, Text> {
 public void reduce(Text key, Iterator<Text> values,
                    OutputCollector<Text, Text> output,
                    Reporter reporter) throws IOException {
     SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy");
     Date date = new Date();
     List<Text> dateList = new ArrayList<Text>();
     for(Iterator<Text> it = values; it.hasNext();) {
         // add the values in the arrayList
         dateList.add((Text) it.next());
     }

     if(dateList.size()==1){ //If the mapper output has only one date , then select that date 
                             // as the VALUE
     try  {
            date = formatter.parse(dateList.get(0).toString());
          } catch (ParseException e) {
            e.printStackTrace();
        }
     } //If part ends 
     else {
             try {
               date = formatter.parse(dateList.get(0).toString()); 
                      //select the first date from list
             } catch (ParseException e1) {
               e1.printStackTrace();
             }
             for(int i=0 ; i <dateList.size();++i){
                   try {
                   //compare the selected date with the rest of the dates in the list.
                   if((formatter.parse(dateList.get(i).toString())).compareTo(date)>0){
                       date=formatter.parse(dateList.get(i).toString());
                       // getting the max date from the list
                        }
                   }
                   catch (ParseException e) {
                  e.printStackTrace();
                }
             } //for loops ends
     }  // else part ends    
     Text value = new Text(date.toString());
       output.collect(key, value);
      }
  }

 public int run(String[] args) throws Exception {
 Configuration conf = getConf();
 JobConf job = new JobConf(conf, RetailCustomerAnalysis_2.class);
 Path in = new Path(args[0]);
 Path out = new Path(args[1]);
 FileInputFormat.setInputPaths(job, in);
 FileOutputFormat.setOutputPath(job, out);
 job.setJobName("RetailCustomerAnalysis_2");
 job.setMapperClass(MapClass.class);
 job.setReducerClass(Reduce.class);
 job.setInputFormat(TextInputFormat.class);
 job.setOutputFormat(TextOutputFormat.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(Text.class);
 job.set("key.value.separator.in.input.line", ",");
 JobClient.runJob(job);
 return 0;
  }
public static void main(String[] args) throws Exception { 
 int res = ToolRunner.run(new Configuration(), new RetailCustomerAnalysis_2(), args);
 System.exit(res);
 }
 }

但是我从列表中得到随机日期作为结果。有谁能帮忙吗?

代码基本正确。减速器的实现需要稍微修改一下。下面的代码截断器创建问题

for(Iterator<Text> it = values; it.hasNext();) {
   // add the values in the arrayList
   dateList.add((Text) it.next());
}

在上面的代码片段中,每次迭代都使用相同的值对象,只是改变了它们的内容。

例如,假设Mapreduce使用以下输入

运行

PRAVEEN, 4002013454,宝贝,026 a12 12/04/2015

PRAVEEN, 4002013454,玩具,020383年,1/04/2014

PRAVEEN, 2727272727272,书,03383年,03/14/2013

PRAVEEN, 2263637373,自行车,7373737,12/24/2012

在reduce方法'dateList'对象元素的值(12/24/2012,12/24/2012,12/24/2012,12/24/2012,12/24/2012,12/24/2012)的值在for循环完成后。这将导致剩余代码执行错误,最终输出错误。

改为如下代码

public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
        SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy");
        Date date = new Date();
        //-----Modified section START-----------
        List<String> dateList = new ArrayList<String>();
        for(Iterator<Text> it = values; it.hasNext();) {
            // add the values in the arrayList
            dateList.add(((Text)it.next()).toString());
        }
        //----Modified section END--------------
        if(dateList.size()==1){ //If the mapper output has only one date , then select that date 
            // as the VALUE
            try  {
                date = formatter.parse(dateList.get(0).toString());
            } catch (ParseException e) {
                e.printStackTrace();
            }
        } //If part ends 
        else {
            String str = dateList.get(0).toString();
            try {
                date = formatter.parse(dateList.get(0).toString());
                //select the first date from list
            } catch (ParseException e1) {
                e1.printStackTrace();
            }
            for(int i=0 ; i <dateList.size();++i){
                try {
                    //compare the selected date with the rest of the dates in the list.
                    if((formatter.parse(dateList.get(i).toString())).compareTo(date)>0){
                        date=formatter.parse(dateList.get(i).toString());
                        // getting the max date from the list
                    }
                }
                catch (ParseException e) {
                    e.printStackTrace();
                }
            } //for loops ends
        }  // else part ends    
        Text value = new Text(date.toString());
        output.collect(key, value);
    }
}

请参考内存中的Hadoop Reducer值?关于map中对象引用的更多细节,请参见reduce方法。

相关内容

  • 没有找到相关文章

最新更新