如何在Hadoop-.20api中指定KeyValueTextInputFormat分隔符



在新的API(apache.hadop.mapreduce.KeyValueTextInputFormat)中,如何指定除制表符(默认值)之外的分隔符(分隔符)来分隔键和值。

样本输入:

one,first line
two,second line

所需输出:

Key : one
Value : first line
Key : two
Value : second line

我将KeyValueTextInputFormat指定为:

    Job job = new Job(conf, "Sample");
    job.setInputFormatClass(KeyValueTextInputFormat.class);
    KeyValueTextInputFormat.addInputPath(job, new Path("/home/input.txt"));

这对于作为分隔符的选项卡来说很好。

在较新的API中,应使用mapreduce.input.keyvaluelinerecordreader.key.value.separator配置属性。

这里有一个例子:

Configuration conf = new Configuration();
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
Job job = new Job(conf);
job.setInputFormatClass(KeyValueTextInputFormat.class);
// next job set-up

请在驱动程序代码中设置以下内容。

conf.set("key.value.separator.in.input.line", ",");

对于KeyValueTextInputFormat,输入行应该是由"\t"分隔的键值对

Key1     Value1,Value2

通过更改默认分隔符,您将能够随心所欲地阅读。

对于新Api

这是的解决方案

//New API
Configuration conf = new Configuration();
conf.set("key.value.separator.in.input.line", ","); 
Job job = new Job(conf);
job.setInputFormatClass(KeyValueTextInputFormat.class);

映射

public class Map extends Mapper<Text, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Text key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    System.out.println("key---> "+key);
    System.out.println("value---> "+value.toString());
   .
   .

输出

key---> one
value---> first line
key---> two
value---> second line

这是一个序列问题。

在创建Job类的实例之前,第一行conf.set("key.value.separator.in.input.line", ",")必须出现。因此:

conf.set("key.value.separator.in.input.line", ","); 
Job job = new Job(conf);

首先,新的API没有在0.20.*中完成,所以如果你想在0.20.**中使用新的API,你应该自己实现这个功能。例如,您可以使用FileInputFormat来实现。忽略LongWritable键,自己用逗号分隔Text值。

默认情况下,KeyValueTextInputFormat类使用tab作为输入文本文件中键和值的分隔符。

如果要从自定义分隔符读取输入,则必须使用正在使用的属性设置配置。

对于新的Hadoop API,情况有所不同:

conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ";");

示例

public class KeyValueTextInput extends Configured implements Tool {
    public static void main(String args[]) throws Exception {
        String log4jConfPath = "log4j.properties";
        PropertyConfigurator.configure(log4jConfPath);
        int res = ToolRunner.run(new KeyValueTextInput(), args);
        System.exit(res);
    }
    public int run(String[] args) throws Exception {

配置conf=this.getConf();

        //conf.set("key.value.separator.in.input.line", ",");

conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");

        Job job = Job.getInstance(conf, "WordCountSampleTemplate");
        job.setJarByClass(KeyValueTextInput.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileSystem fs = FileSystem.get(new URI(outputPath.toString()), conf);
        fs.delete(outputPath, true);
        FileOutputFormat.setOutputPath(job, outputPath);
        return job.waitForCompletion(true) ? 0 : 1;
    }
}
class Map extends Mapper<Text, Text, Text, Text> {
    public void map(Text k1, Text v1, Context context) throws IOException, InterruptedException {
        context.write(k1, v1);
    }
}
class Reduce extends Reducer<Text, Text, Text, Text> {
    public void reduce(Text Key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String sum = " || ";
        for (Text value : values)
            sum = sum + value.toString() + " || ";
        context.write(Key, new Text(sum));
    }
}

相关内容

  • 没有找到相关文章

最新更新