在新的API(apache.hadop.mapreduce.KeyValueTextInputFormat)中,如何指定除制表符(默认值)之外的分隔符(分隔符)来分隔键和值。
样本输入:
one,first line
two,second line
所需输出:
Key : one
Value : first line
Key : two
Value : second line
我将KeyValueTextInputFormat指定为:
Job job = new Job(conf, "Sample");
job.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job, new Path("/home/input.txt"));
这对于作为分隔符的选项卡来说很好。
在较新的API中,应使用mapreduce.input.keyvaluelinerecordreader.key.value.separator
配置属性。
这里有一个例子:
Configuration conf = new Configuration();
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
Job job = new Job(conf);
job.setInputFormatClass(KeyValueTextInputFormat.class);
// next job set-up
请在驱动程序代码中设置以下内容。
conf.set("key.value.separator.in.input.line", ",");
对于KeyValueTextInputFormat,输入行应该是由"\t"分隔的键值对
Key1 Value1,Value2
通过更改默认分隔符,您将能够随心所欲地阅读。
对于新Api
这是的解决方案
//New API
Configuration conf = new Configuration();
conf.set("key.value.separator.in.input.line", ",");
Job job = new Job(conf);
job.setInputFormatClass(KeyValueTextInputFormat.class);
映射
public class Map extends Mapper<Text, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
System.out.println("key---> "+key);
System.out.println("value---> "+value.toString());
.
.
输出
key---> one
value---> first line
key---> two
value---> second line
这是一个序列问题。
在创建Job
类的实例之前,第一行conf.set("key.value.separator.in.input.line", ",")
必须出现。因此:
conf.set("key.value.separator.in.input.line", ",");
Job job = new Job(conf);
首先,新的API没有在0.20.*中完成,所以如果你想在0.20.**中使用新的API,你应该自己实现这个功能。例如,您可以使用FileInputFormat来实现。忽略LongWritable键,自己用逗号分隔Text值。
默认情况下,KeyValueTextInputFormat
类使用tab作为输入文本文件中键和值的分隔符。
如果要从自定义分隔符读取输入,则必须使用正在使用的属性设置配置。
对于新的Hadoop API,情况有所不同:
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ";");
示例
public class KeyValueTextInput extends Configured implements Tool {
public static void main(String args[]) throws Exception {
String log4jConfPath = "log4j.properties";
PropertyConfigurator.configure(log4jConfPath);
int res = ToolRunner.run(new KeyValueTextInput(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
配置conf=this.getConf();
//conf.set("key.value.separator.in.input.line", ",");
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
Job job = Job.getInstance(conf, "WordCountSampleTemplate");
job.setJarByClass(KeyValueTextInput.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileSystem fs = FileSystem.get(new URI(outputPath.toString()), conf);
fs.delete(outputPath, true);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}
}
class Map extends Mapper<Text, Text, Text, Text> {
public void map(Text k1, Text v1, Context context) throws IOException, InterruptedException {
context.write(k1, v1);
}
}
class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text Key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String sum = " || ";
for (Text value : values)
sum = sum + value.toString() + " || ";
context.write(Key, new Text(sum));
}
}