以下是Hadoop字数java映射和reduce源代码:
在map函数中,我已经到了可以输出所有以字母"c"开头的单词以及该单词出现的总次数的位置,但我想做的只是输出以字母"c"开头的总单词数,但我在获取总数时有点困难。如有任何帮助,我们将不胜感激,谢谢。
示例
我得到的输出:
可以2
can 3
5类
我想得到的:
c-total 10
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
if(word.toString().startsWith("c"){
output.collect(word, one);
}
}
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get(); //gets the sum of the words and add them together
}
output.collect(key, new IntWritable(sum)); //outputs the word and the number
}
}
Chris Gerken的答案是正确的。
如果你输出单词作为你的密钥,它只会帮助你计算以"c"开头的唯一单词的计数
并非所有"c"的总数。
因此,您需要从映射器输出一个唯一的密钥。
while (itr.hasMoreTokens()) {
String token = itr.nextToken();
if(token.startsWith("c")){
word.set("C_Count");
output.collect(word, one);
}
}
下面是一个使用新Api 的例子
驱动程序类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
FileSystem fs = FileSystem.get(conf);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
if (fs.exists(new Path(args[1])))
fs.delete(new Path(args[1]), true);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJarByClass(WordCount.class);
job.waitForCompletion(true);
}
}
映射器类
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
String token = itr.nextToken();
if(token.startsWith("c")){
word.set("C_Count");
context.write(word, one);
}
}
}
}
减速器类别
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
而不是
output.collect(word, one);
在您的映射器中,尝试:
output.collect("c-total", one);
映射器的更简单代码:
public void map(LongWritable key, Text value,OutputCollector<Text,IntWritable> op, Reporter r)throws IOException
{
String s = value.toString();
for (String w : s.split("\W+"))
{
if (w.length()>0)
{
if(w.startsWith("C")){
op.collect(new Text("C-Count"), new IntWritable(1));
}
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable ikey, Text ivalue, Context context)
throws IOException, InterruptedException {
String line= ivalue.toString();
String [] values = line.split(" ");
IntWritable val=new IntWritable(1);
for(String i:values)
{
String x=i.charAt(0);
if(x=='c')
{
context.write(new Text("c"),val);
} }
}}
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum=0;
for (IntWritable val : values) {
sum=sum+val.get();
}
context.write(key,new IntWritable(sum));
}}