我是Hadoop的新手。我有一个以下格式的文件:
123文本查找器。它是一个固定宽度的文件。我想添加一个分隔符。假设我的第一个字段是123,即长度3,第二个字段是textfinder,即长度10,第三个字段稍后,即长度5。每个字段都有一个预定义的长度。现在我需要添加一个分隔符来分隔我的字段。稍后我的输出应该是123|textfinder|。我只有值(文件中的行)。什么应该是映射器和减速器程序的关键。
提前感谢
在您的特定情况下,您甚至不需要减少器,mapper的键值仍然是line no. - line
,然后您只需要写回添加了分隔符的行作为键输出。检查以下代码:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Delimiter extends Configured implements Tool {
public static class DelimiterMapper
extends Mapper<LongWritable, Text, Text, NullWritable> {
private static Text addDelimiter(Text value, char delimiter) {
String str = value.toString();
String ret = str.substring(0,2) + delimiter + str.substring(3,12) + delimiter + str.substring(13);
return new Text(ret);
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(addDelimiter(value, '|'), NullWritable.get());
}
}
public int run(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(getConf());
if (args.length != 2) {
System.err.println("Usage: Delimiter <in> <out>");
return 2;
}
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputDir = new Path(args[1]);
if (outputDir.getFileSystem(getConf()).exists(outputDir)) {
throw new IOException("Output directory " + outputDir +
"already exists");
}
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("Delimiter");
job.setJarByClass(Delimiter.class);
job.setMapperClass(DelimiterMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
return job.waitForCompletion(true) ? 0:1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Delimiter(), args);
System.exit(res);
}
}