如何在Java中使用hadoop map reduce交换键和值以升序排序



我希望只使用mapper和reducer类以升序排序访问次数的输出我想交换映射器输出中的键和值以便它自动排序然后在reducer

中再次交换它们然而,我对我应该使用的数据类型有麻烦。

这是映射器:

extends
Mapper<LongWritable, Text, IntWritable, Text> 
{
public void map(LongWritable cle, Text valeur, Context sortie)
throws IOException          
{

String url = "";
int nbVisites = 0;
Pattern httplogPattern = Pattern.compile("([^\s]+) - - \[(.+)\] "([^\s]+) (/[^\s]*) HTTP/[^\s]+" [^\s]+ ([0-9]+)");
String ligne = valeur.toString();
//if (ligne.length()>0) {
Matcher matcher = httplogPattern.matcher(ligne);
if (matcher.matches()) {
url = matcher.group(1);
nbVisites++;

Text urlText = new Text(url);
IntWritable value = new IntWritable(nbVisites);
try 
{           
sortie.write(value, urlText);   
System.out.println(urlText + " ; " + value);
} 
catch (InterruptedException e) 
{
e.printStackTrace();
}
}
//}
}

减速器:

Reducer<IntWritable, Text, Text, IntWritable> 
{

public void reduce(Text key, Iterable<IntWritable> values, Context sortie) throws IOException, InterruptedException
{

Iterator<IntWritable> it = values.iterator();
int nb = 0;
while (it.hasNext()) {
nb = nb + it.next().get();
}


try {

sortie.write(key, new IntWritable(nb));

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}



}

和主类:

public static void main(String[] args) throws Exception 
{
Job job = new Job();

job.setJobName("TP4");

job.setJarByClass(WordCount.class);

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(WordCountMapper.class);        
job.setReducerClass(WordCountReducer.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileSystem fs = FileSystem.get(job.getConfiguration());
fs.delete(new Path(args[1]));

job.waitForCompletion(true);
}

文件如下:

每行包含一个url,并计算在

文件中遇到url的次数。
199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0

下面是我当前的输出:

1   199.72.81.55
1   unicomp6.unicomp.net
1   199.120.110.21
1   burger.letters.com
1   199.120.110.21
1   burger.letters.com
1   burger.letters.com
1   205.212.115.106
1   d104.aa.net
1   129.94.144.152
1   unicomp6.unicomp.net
1   unicomp6.unicomp.net
1   unicomp6.unicomp.net
1   d104.aa.net
1   d104.aa.net
1   d104.aa.net

我希望这个输出按升序排列:

129.94.144.152  1
199.72.81.55    1
205.212.115.106 1
199.120.110.21  2
burger.letters.com  3
d104.aa.net 4
unicomp6.unicomp.net    4

使用NullWritable强制所有数据到一个reducer。然后,您可以使用reducer

中的TreeMap对所有值进行排序。
public class WebLogDriver extends Configured implements Tool {
public static final String APP_NAME = WebLogDriver.class.getSimpleName();
public static void main(String[] args) throws Exception {
final int status = ToolRunner.run(new Configuration(), new WebLogDriver(), args);
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, APP_NAME);
job.setJarByClass(WebLogDriver.class);
// outputs for mapper and reducer
job.setOutputKeyClass(Text.class);
// setup mapper
job.setMapperClass(WebLogDriver.WebLogMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
// setup reducer
job.setReducerClass(WebLogDriver.WebLogReducer.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
final Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
return job.waitForCompletion(true) ? 0 : 1;
}
static class WebLogMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
static final Pattern HTTP_LOG_PATTERN = Pattern.compile("(\S+) - - \[(.+)] "(\S+) (/\S*) HTTP/\S+" \S+ (\d+)");
final Text valueOut = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
String line = value.toString();
if (line.isEmpty()) {
return;
}
Matcher matcher = HTTP_LOG_PATTERN.matcher(line);
if (matcher.matches()) {
String url = matcher.group(1);
valueOut.set(String.format("%s,%d", url, 1); // count each host once
context.write(NullWritable.get(), valueOut);
}
}
}
static class WebLogReducer extends Reducer<NullWritable, Text, Text, IntWritable> {
static final Text keyOut = new Text();
static final IntWritable valueOut = new IntWritable();
@Override
protected void reduce(NullWritable key, Iterable<Text> values, Reducer<NullWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// builds in ascending order by default
Map<String, Integer> m = new TreeMap<>();
for (Text t : values) {
String v = new String(t.getBytes(), StandardCharsets.UTF_8);
String[] parts = v.split(",");
String host = parts[0];
int count = m.getOrDefault(host, 0) + Integer.parseInt(parts[1]);
m.put(host, count);
}
for (Map.Entry<String, Integer> e : m.entrySet()) {
keyOut.set(e.getKey());
valueOut.set(e.getValue());
context.write(keyOut, valueOut);
}
}
}
}

相关内容

  • 没有找到相关文章

最新更新