我想找出面积最大、面积最大的国家。
我的数据集如下
Afghanistan 648
Albania 29
Algeria 2388
Andorra 0
Austria 84
Bahrain 1
Bangladesh 143
Belgium 31
Benin 113
Bhutan 47
Brunei 6
Bulgaria 111
Burma 678
Cameroon 474
Central-African-Republic 623
Chad 1284
China 9561
Cyprus 9
Czechoslovakia 128
Denmark 43
Djibouti 22
Egypt 1001
Equatorial-Guinea 28
Ethiopia 1222
Finland 337
France 547
Germany-DDR 108
Germany-FRG 249
Greece 132
Guam 0
Hong-Kong 1
Hungary 93
India 3268
谁能帮我写mapreduce程序?我的mapper和reducer代码是这样的
映射器
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] tokens = value.toString().split(",");
if(Integer.parseInt(tokens[2]) == 1){
context.write(new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[3])));
}
}
齿轮public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
int max = 0;
for(IntWritable x : values){
if(max < Integer.parseInt(String.valueOf(x))){
max = Integer.parseInt(String.valueOf(x));
}
}
context.write(key, new IntWritable(max));
}
算法很简单,在映射器中收集最大值,并在映射器结束时使用cleanup
将其写入磁盘。
int max = Integer.MIN_VALUE;
String token;
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
if(Integer.parseInt(tokens[2]) == 1){
int val = Integer.parseInt(tokens[3])
if(Integer.parseInt(tokens[3]) > max){
max = val;
token = tokens[0];
}
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
context.write(new LongWritable(max), new Text(token));
}
你所有的东西现在都在max上被减少了,这意味着如果我们降序排序,你会得到最大值作为减速器中的第一条记录。因此,您需要在作业中设置:
job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
reducer是一个简单的found/not-found开关,如果它有最大值(第一个记录),则只输出每个国家。
boolean foundMax = false;
@Override
public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
if(!foundMax){
for(Text t : values){
context.write(t, key);
}
foundMax = true;
}
}