我写了一个MapReduce program
来分析用户的dataset
,它是这样的形式
UserID::Gender::Age::MoviesRated::Zip Code
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
I want to
根据用户的平均年龄找到前10个邮政编码邮编,按平均年龄降序排列。前10名表示该邮政编码地区用户最年轻的10个平均年龄。
我有一个MapClass
,一个CombinerClass
和一个ReducerClass
。
我的代码如下
public class TopTenYoungestAverageAgeRaters extends Configured implements Tool {
private static TreeSet<AverageAge> top10 = new TreeSet<AverageAge>();
public static class MapClass extends Mapper<LongWritable, Text, Text, AverageAge>
{
public boolean isNumeric(String value) // Checks if record is valid
{
try
{
Integer.parseInt(value);
return true;
}
catch(NumberFormatException e)
{
return false;
}
}
public AverageAge toCustomWritable(String[] line)
{
AverageAge record = new AverageAge(new IntWritable(Integer.parseInt(line[0])), new IntWritable(Integer.parseInt(line[2])), new Text(line[1]), new IntWritable(Integer.parseInt(line[3])), new Text(line[4]));
return record;
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
String[] values = line.split("::");
if(isNumeric(values[0]))
{
AverageAge customTuple = toCustomWritable(values);
context.write(new Text(values[4]), customTuple);
}
}
}
public static class CombinerClass extends Reducer<Text, AverageAge, Text, AverageAge>
{
public void reduce(Text key, Iterable<AverageAge> values, Context context) throws IOException, InterruptedException
{
AverageAge newRecord = new AverageAge();
long age = 0;
int count = 0;
for(AverageAge value:values)
{
age += value.getUserAge();
count += 1;
}
newRecord.setZipCode(key.toString());
newRecord.setAverageAge((double)(age/count));
context.write(key, newRecord);
}
}
public static class ReducerClass extends Reducer<Text, AverageAge, NullWritable, AverageAge>
{
public void reduce(Text key, Iterable<AverageAge> values, Context context) throws IOException, InterruptedException
{
for(AverageAge value:values)
{
top10.add(value);
if(top10.size() > 10)
top10.remove(top10.last());
}
}
protected void cleanup(Context context) throws IOException, InterruptedException
{
for(AverageAge avg: top10)
{
context.write(NullWritable.get(), avg);
}
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
int res = ToolRunner.run(new Configuration(), new TopTenYoungestAverageAgeRaters(), args);
System.exit(res);
}
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setMapperClass(MapClass.class);
job.setCombinerClass(CombinerClass.class);
job.setReducerClass(ReducerClass.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AverageAge.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(AverageAge.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
MapClass
将带有zipcode的输出写入key
,将AverageAge
(自定义可写类)写入value
CombinerClass
计算属于该zipcode的用户的平均年龄,并将key
写入zipcode,并将值写入AverageAge
。
ReducerClass
给出(应该给出)平均用户年龄的前10个邮政编码,但我只得到一个记录作为输出。
我还尝试在Reducer类中做System.out.println()
,以查看传递给ReducerClass
的值,但console
上没有打印任何内容(我在eclipse环境中本地运行程序)
我是MapReduce的新手,无法找出这个程序中的错误。
数据集源
问题陈述似乎是矛盾的:平均年龄下降的前10名应该是最老的10名,而不是最年轻的10名。
无论如何,这里有很多很多错误。
- 组合子不保证永远被调用
- 如果您有多个reducer任务,您将在不同的文件中从每个reducer获得多达10个输出 如前所述,您将获得的"前10名"将是最低的10个邮政编码(按字典顺序排序)。
- 正常情况下,到
cleanup()
时你不再写记录了。
您想要的是使用shuffle将具有相同邮政编码的记录放在一起,并使用聚合类(Combiner和Reducer)计算平均值。"前10岁"的要求要等到每个邮编都有了年龄之后才能确定。然而,关键的一点是,为了以分布式方式计算平均值,在减少之前永远不能丢失分母。您的车队中的组合器可能会收到具有相同密钥的记录。
Mapper接受一个记录并生成一个三元组:
k::g::a::z |=> z |-> ( 1, a )
Combiner接受具有相同键的三元组集合,并对它们求平均值(并对分母求和):
z |-> [ ( d1, a1 ), ..., ( dn, an ) ] |=> z |-> ( sum( di ), sum( ai ) / sum ( di ) )
Reducer取一组具有相同键的三元组,并取其平均值,抛出分母:
z |-> [ ( d1, a1 ), ..., ( dn, an ) ] |=> z |-> sum( ai ) / sum ( di )
无论您是否提供组合器,您的算法都应该工作;组合器是一种优化,只适用于某些map-约简情况。
为了只保留前10个结果,现在需要按平均年龄重新排序。
这意味着另一个映射器:
z |-> avg |=> avg |-> z
和一个只输出前10个结果的reducer(剩下的练习留给阅读器)。此外,只能有一个reduce任务,否则你会得到前10x,其中x是reduce任务的数量。