如何使用mapreduce程序从reducer中获取字符串作为值



有人能帮我一下吗?我需要得到文本作为减速器类的输出。但是当我尝试时,它显示了一些错误

错误:java.lang.NumberFormatException: For input string: "open"java.lang.NumberFormatException.forInputString (NumberFormatException.java: 65)java.lang.Integer.parseInt (Integer.java: 492)java.lang.Integer.parseInt (Integer.java: 527)com.sciera.mapreduce.OpenClose TextSumReducer.reduce美元(OpenClose.java: 85)在com.sciera.mapreduce.OpenClose TextSumReducer.reduce美元(OpenClose.java: 1)org.apache.hadoop.mapreduce.Reducer.run (Reducer.java: 171)org.apache.hadoop.mapred.ReduceTask.runNewReducer (ReduceTask.java: 627)org.apache.hadoop.mapred.ReduceTask.run (ReduceTask.java: 389)在org.apache.hadoop.mapred.YarnChild 2.美元运行(YarnChild.java: 163)在java.security.AccessController。doPrivileged(本地方法)javax.security.auth.Subject.doAs (Subject.java: 415)org.apache.hadoop.security.UserGroupInformation.doAs (UserGroupInformation.java:> 1671)org.apache.hadoop.mapred.YarnChild.main (YarnChild.java: 158)

public class ClassName {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, Text>{       
         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
           String[] str = value.toString().split(",");
           String rwid=str[10];
           context.write(new Text(rwid), new Text(value));
        }
    }
    public static class TextSumReducer extends Reducer<Text,Text,Text,Text> {
         private Text result = new Text();      
         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
               int open = 0;
               String value         = "";
               String pDate     = "0000-00-00";
               for (Text val : values) {
                   String[] valSeg          = val.toString().split(",");
                   int id                   = Integer.parseInt(valSeg[0]);
                   String date  = valSeg[4];
                   String fs        = valSeg[6];
                   String fDate         = valSeg[7];
                   String fStatus       = valSeg[8];
                   int lId              = Integer.parseInt(valSeg[9]);
                   if(date.equalsIgnoreCase("null")){
                       date = "0000-00-00";
                   }
                   SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                   Date date1,date2;
                   try {
                       date1            = sdf.parse(date);
                       date2            = sdf.parse(pDate);
                       long diff        = date1.getTime() - date2.getTime();
                       long gapDays     = TimeUnit.DAYS.convert(diff, TimeUnit.MILLISECONDS);
                       String origStatus = "";
                       if(pDate!=fDate && (fStatus != "" && fStatus!="null") && lId>0 && (fDate!= "null" && fDate!="0000-00-00")){
                           if(fStatus.equalsIgnoreCase("a") || fStatus.equalsIgnoreCase("aa") || fStatus.equalsIgnoreCase("aaa")){
                               origStatus = "a";
                               open += 1;
                           }else if(!fStatus.equalsIgnoreCase("ccc")){
                               origStatus = "b";
                           }else{
                               origStatus = "";
                           }
                           pDate = fDate;
                           value = origStatus;
                           result.set(new Text(value));
                           context.write(key, result);
                       }
                   } catch (ParseException e) {
                        // TODO Auto-generated catch block
                       e.printStackTrace();
                   }
               }
         }
    }
    public static void main(String[] args) throws Exception {
        Path p = new Path(args[1]);
        FileSystem date = FileSystem.get(new Configuration());
        date.exists(p);
        date.delete(p, true);
        Configuration conf = new Configuration();
        conf.set("key.value.separator.in.input.line", ",");
        Job job = Job.getInstance(conf, "Sample");
        job.setJarByClass(ClassName.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(TextSumReducer.class);
        job.setReducerClass(TextSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

最后,我通过另一个代码逻辑得到了答案。

public class TestClass {
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, Text>{
     private Text word = new Text();
     private final NullWritable nullKey = NullWritable.get();
     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         String[] valSeg            = value.toString().split("\|");
         String rwid                = valSeg[10];
         context.write(new Text(rwid),  value);
    }
}
public static class TextSumReducer extends Reducer<Text,Text,Text,Text> {
    private final NullWritable nullKey = NullWritable.get();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int valueItr = 0;
        for (Text val : values) {               
            String valueToSplit = val.toString().trim();
            String valSeg[] = valueToSplit.split("\|");

           int open                 = 0;
           String value             = "";
           String prevDateAgg       = "0000-00-00";
           int prevId               = 0;
           String[] valSeg          = val.toString().split(",");
           String source            = valSeg[2];
           String state             = valSeg[3];
           String dateAggregated    = valSeg[4];
           String status            = valSeg[5];
           String firstSource       = valSeg[6];
           String firstDate         = valSeg[7];
           String firstStatus       = valSeg[8];
           int lifeReId             = Integer.parseInt(valSeg[9]);
           String rwid              = valSeg[10];
           if(dateAggregated.equalsIgnoreCase("null")){
               dateAggregated = "0000-00-00";
           }
           SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
           Date date1,date2;
           try {
               date1                = sdf.parse(dateAggregated);
               date2                = sdf.parse(prevDateAgg);
               long diff            = date1.getTime() - date2.getTime();
               long gapDays         = TimeUnit.DAYS.convert(diff, TimeUnit.MILLISECONDS);
               String origStatus    = "";
               if(prevDateAgg!=firstDate && (firstStatus != "" && firstStatus!="null") && lifeReId>0 && (firstDate!= "null" && firstDate!="0000-00-00")){
                   if(firstStatus.equalsIgnoreCase("for_rent") || firstStatus.equalsIgnoreCase("for_sale") || firstStatus.equalsIgnoreCase("for sale") || firstStatus.equalsIgnoreCase("for rent") || firstStatus.equalsIgnoreCase("Foreclosure") || firstStatus.equalsIgnoreCase("Reo")){
                       origStatus = "open";
                       String content = rwid + "|" + firstDate + "|" + origStatus;
                       context.write(key,new Text(content));
                   }else if(!firstStatus.equalsIgnoreCase("off_market")){
                       origStatus = "close";
                   }else{
                       origStatus = "";
                   }
                   prevDateAgg = firstDate;
                   value = origStatus;                     
               }
           } catch (ParseException e) {
               e.printStackTrace();
           }
        }
    }
}
public static void main(String[] args) throws Exception {
    Path p = new Path(args[1]);
    FileSystem fs = FileSystem.get(new Configuration());
    fs.exists(p);
    fs.delete(p, true);
    Configuration conf = new Configuration();
    conf.set("key.value.separator.in.input.line", "|");
    Job job = Job.getInstance(conf, "OpenClose");
    job.setJarByClass(TestClass.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(TextSumReducer.class);
    job.setReducerClass(TextSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setNumReduceTasks(1);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

相关内容

  • 没有找到相关文章

最新更新