Hadoop mapReduce只在HDFS中存储值



我使用This来删除重复的行

public class DLines
 {
   public static class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>
    {
    private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      @Override
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException
       {
           String line=value.toString();
           //int hash_code=line.hashCode();
           context.write(value, one);
       }
   }
public static class TokenCounterReducer extends Reducer<Text, IntWritable, Text, IntWritable> 
 {
        @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException 
     {
 public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException 
     {
       int sum = 0;
       for (IntWritable value : values) 
       {   
           sum += value.get();        
       }
       if (sum<2)
           {
             context.write(key,new IntWritable(sum));
           }
      }
      }

如果您不需要从您的reducer的值,只需使用NullWritable

可以写成context.write(key,NullWritable.get());

在你的驱动中,你也可以设置

 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);

,

 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(NullWritable.class);

您可以使用NullWritable类。

public class DLines
 {
   public static class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>
    {
    private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      @Override
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException
       {
           String line=value.toString();
           context.write(value, one);
       }
   }
public static class TokenCounterReducer extends Reducer<Text, IntWritable, Text, NullWritable> 
 {
   NullWritable out = NullWritable.get();
        @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException 
       {
       int sum = 0;
       for (IntWritable value : values) 
       {   
           sum += value.get();        
       }
       if (sum<2)
           {
             context.write(key,out);
           }
      }
      }

驱动代码

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

希望这能回答你的问题

相关内容

  • 没有找到相关文章

最新更新