我有一个大的TSV文件,输入如下:
Site1 Tag1
Site1 Tag34
Site1 Tag8
Site2 Tag75
Site2 Tag54
Site2 Tag8
Site3 Tag24
Site3 Tag34
Site3 Tag1
...
我想在hadoop MapReduce的帮助下,找到输入中所有站点之间相似的站点对,以及每个站点对中相似标签的数量。
部分输入的输出:
Site1 Site2 1 // Site1 is similar to Site2 with 1 tag (Tag8)
Site1 Site3 2 // Site1 is similar to Site3 with 2 tag (Tag1 and Tag34)
Site2 Site1 1
Site3 Site1 2
我想每个站点只输出10个最相似的站点。
每个站点有3个标签
我想使用2个MapReduce作业:
- 要映射标记(键)和站点并按标记减少,请在减少阶段为特定标记获取所有站点,并编写输出"标记SiteX SiteY"
- 第二个MapReduce作业将接受第一个输入,并将执行GROUP BY SiteX,SiteY对,以获得一对相似站点中相似标签的数量
我试图实现第一个MAPRED,但我得到的只是"标记,站点"输出。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RawToSimilarTagMapper {
public static class TagToSiteMapper extends Mapper<Object, Text, Text, Text> {
private Text site = new Text();
private Text tag = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String [] siteTag = value.toString().split("t");
site.set(siteTag[0]);
tag.set(siteTag[1]);
context.write(tag, site);
System.out.println();
}
}
public static class SimilarSiteReducer extends Reducer<Text, Text, Text, Text> {
private Text value = new Text();
public void reduce(Text key, Iterable<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, InterruptedException {
for (Text text : values) {
for (Text text2 : values) {
if (!text.equals(text2)) {
value.set(text.toString() + "t" + text2.toString());
output.collect(key, value);
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "raw-to-similar");
job.setJarByClass(RawToSimilarTagMapper.class);
job.setMapperClass(TagToSiteMapper.class);
job.setCombinerClass(SimilarSiteReducer.class);
job.setReducerClass(SimilarSiteReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
FileSystem fs = null;
Path dstFilePath = new Path(args[2]);
try {
fs = dstFilePath.getFileSystem(conf);
if (fs.exists(dstFilePath))
fs.delete(dstFilePath, true);
} catch (IOException e1) {
e1.printStackTrace();
}
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我在这里做错了什么?
同样,在下一阶段,我如何才能每个网站只获得前10个最相似的网站?
我就是这么做的。此外,你可以通过在第二个作业的输出上写第三个作业来进行排序,以获得前十个站点。(提示:您只需要编写映射器)注意:这适用于所提供的示例数据。您可能需要对格式错误的数据进行初始清理。
最终输出:
Site2 2
Site2 Site1 1
Site3 1
Site3 Site1 2
代码:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class TopSites{
public static class TagToSiteMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String [] siteTag = value.toString().split("t");
context.write(new Text(siteTag[1]), new Text(siteTag[0]));
System.out.println(siteTag[1] + " --> " + siteTag[0]);
}
}
public static class TagToSiteReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String l = "";
System.out.print("Key: [" + key.toString() + "] Values: [");
for (Text site : values)
l += site + "t";
l=l.substring(0, l.length()-1);
System.out.println(l + "]");
context.write(new Text(key), new Text(l));
}
}
public static class TopSiteMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String [] data = value.toString().split("t");
String sites ="";
System.out.println("map received: "+ value.toString());
for(int i=1;i<data.length;i++)
sites += data[i] + "t";
System.out.println(sites.substring(0,sites.length()-1));
context.write(new Text(sites.substring(0,sites.length()-1)), one);
}
}
public static class TopSiteReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
System.out.print("Key: [" + key.toString() + "] Values: [");
for (IntWritable site : values){
System.out.print(site.get());
sum+=site.get();
}
System.out.println("]");
context.write(new Text(key), new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "site-to-tag");
job.setJarByClass(TopSites.class);
job.setMapperClass(TagToSiteMapper.class);
job.setReducerClass(TagToSiteReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, TagToSiteMapper.class);
Path outputpath = new Path(args[1]+"_temp");
FileOutputFormat.setOutputPath(job,outputpath);
FileSystem fs = null;
Path dstFilePath = new Path(args[1]);
try {
fs = dstFilePath.getFileSystem(conf);
if (fs.exists(dstFilePath))
fs.delete(dstFilePath, true);
dstFilePath = new Path(args[1]+"_temp");
fs = dstFilePath.getFileSystem(conf);
if (fs.exists(dstFilePath))
fs.delete(dstFilePath, true);
} catch (IOException e1) {
e1.printStackTrace();
}
int code = job.waitForCompletion(true)?0:1;
if(code == 0)
{
Job SecondJob = Job.getInstance(conf, "Tag-to-Sites");
SecondJob.setJarByClass(TopSites.class);
SecondJob.setOutputKeyClass(Text.class);
SecondJob.setOutputValueClass(IntWritable.class);
SecondJob.setMapperClass(TopSiteMapper.class);
SecondJob.setCombinerClass(TopSiteReducer.class);
SecondJob.setReducerClass(TopSiteReducer.class);
FileInputFormat.addInputPath(SecondJob,new Path(args[1]+ "_temp"));
FileOutputFormat.setOutputPath(SecondJob,new Path(args[1]));
int exitCode = SecondJob.waitForCompletion(true)?0:1;
FileSystem.get(conf).delete(new Path(args[1]+"_temp"), true);
System.exit(exitCode);
}
}
}
控制台标准输出:
Tag1 --> Site1
Tag34 --> Site1
Tag8 --> Site1
Tag75 --> Site2
Tag54 --> Site2
Tag8 --> Site2
Tag24 --> Site3
Tag34 --> Site3
Tag1 --> Site3
Key: [Tag1] Values: [Site3 Site1]
Key: [Tag24] Values: [Site3]
Key: [Tag34] Values: [Site3 Site1]
Key: [Tag54] Values: [Site2]
Key: [Tag75] Values: [Site2]
Key: [Tag8] Values: [Site2 Site1]
map received: Tag1 Site3 Site1
Site3 Site1
map received: Tag24 Site3
Site3
map received: Tag34 Site3 Site1
Site3 Site1
map received: Tag54 Site2
Site2
map received: Tag75 Site2
Site2
map received: Tag8 Site2 Site1
Site2 Site1
Key: [Site2] Values: [11]
Key: [Site2 Site1] Values: [1]
Key: [Site3] Values: [1]
Key: [Site3 Site1] Values: [11]
Key: [Site2] Values: [2]
Key: [Site2 Site1] Values: [1]
Key: [Site3] Values: [1]
Key: [Site3 Site1] Values: [2]
看起来您的组合器在这里出现了问题。mapper&组合器必须是相同的,这在您的情况下是不正确的。您能评论出组合器只用于性能优化并运行它吗。