Map reduce in Java Hadoop



我是Hadoop的新手。我有一个以下格式的文件:

123文本查找器。它是一个固定宽度的文件。我想添加一个分隔符。假设我的第一个字段是123,即长度3,第二个字段是textfinder,即长度10,第三个字段稍后,即长度5。每个字段都有一个预定义的长度。现在我需要添加一个分隔符来分隔我的字段。稍后我的输出应该是123|textfinder|。我只有值(文件中的行)。什么应该是映射器和减速器程序的关键。

提前感谢

在您的特定情况下,您甚至不需要减少器,mapper的键值仍然是line no. - line,然后您只需要写回添加了分隔符的行作为键输出。检查以下代码:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Delimiter extends Configured implements Tool {
  public static class DelimiterMapper
      extends Mapper<LongWritable, Text, Text, NullWritable> {
  private static Text addDelimiter(Text value, char delimiter) {
     String str = value.toString();
     String ret = str.substring(0,2) + delimiter + str.substring(3,12) + delimiter + str.substring(13);
     return new Text(ret);
  }
  public void map(LongWritable key, Text value, Context context)
                   throws IOException, InterruptedException {
       context.write(addDelimiter(value, '|'), NullWritable.get());
    }
  }  
  public int run(String[] args)
     throws IOException, InterruptedException, ClassNotFoundException {
    Job job = Job.getInstance(getConf());
    if (args.length != 2) {
       System.err.println("Usage: Delimiter <in> <out>"); 
       return 2;
    }
    FileInputFormat.addInputPath(job, new Path(args[0]));
    Path outputDir = new Path(args[1]);
    if (outputDir.getFileSystem(getConf()).exists(outputDir)) {
        throw new IOException("Output directory " + outputDir + 
                              "already exists");
    }
    FileOutputFormat.setOutputPath(job, outputDir);
    job.setJobName("Delimiter");
    job.setJarByClass(Delimiter.class);
    job.setMapperClass(DelimiterMapper.class);
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    return job.waitForCompletion(true) ? 0:1; 
  }
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new Delimiter(), args);
    System.exit(res);
  }
}

相关内容

  • 没有找到相关文章

最新更新