我一直在尝试连接来自两个数据集的字段,但没有成功。如果有人能帮助我实现这一目标,我将不胜感激。我一直在尝试的文件和代码如下
电影元数据
975900 /m/03vyhn Ghosts of Mars 2001-08-24 14010832 98.0 {"/m/02h40lc": "English Language"} {"/m/09c7w0": "United States of America"} {"/m/01jfsb": "Thriller", "/m/06n90": "Science Fiction", "/m/03npn": "Horror", "/m/03k9fj": "Adventure", "/m/0fdjb": "Supernatural", "/m/02kdv5l": "Action", "/m/09zvmj": "Space western"}
3196793 /m/08yl5d Getting Away with Murder: The JonBenét Ramsey Mystery 2000-02-16 95.0 {"/m/02h40lc": "English Language"} {"/m/09c7w0": "United States of America"} {"/m/02n4kr": "Mystery", "/m/03bxz7": "Biographical film", "/m/07s9rl0": "Drama", "/m/0hj3n01": "Crime Drama"}
28463795 /m/0crgdbh Brun bitter 1988 83.0 {"/m/05f_3": "Norwegian Language"} {"/m/05b4w": "Norway"} {"/m/0lsxr": "Crime Fiction", "/m/07s9rl0": "Drama"}
9363483 /m/0285_cd White Of The Eye 1987 110.0 {"/m/02h40lc": "English Language"} {"/m/07ssc": "United Kingdom"} {"/m/01jfsb": "Thriller", "/m/0glj9q": "Erotic thriller", "/m/09blyk": "Psychological thriller"}
字符元数据
975900 /m/03vyhn 2001-08-24 Akooshay 1958-08-26 F 1.62 Wanda De Jesus 42 /m/0bgchxw /m/0bgcj3x /m/03wcfv7
975900 /m/03vyhn 2001-08-24 Lieutenant Melanie Ballard 1974-08-15 F 1.78 /m/044038p Natasha Henstridge 27 /m/0jys3m /m/0bgchn4 /m/0346l4
975900 /m/03vyhn 2001-08-24 Desolation Williams 1969-06-15 M 1.727 /m/0x67 Ice Cube 32 /m/0jys3g /m/0bgchn_ /m/01vw26l
975900 /m/03vyhn 2001-08-24 Sgt Jericho Butler 1967-09-12 M 1.75 Jason Statham 33 /m/02vchl6 /m/0bgchnq /m/034hyc
在第一个文件中,我对第一个字段感兴趣,即电影 ID 和第三个字段电影名称。在第二个文件中,第一个字段是电影 ID,第 9 个字段是演员姓名。每个电影 ID 可以有多个演员名称,如上面的文件 2 所示。我尝试实现的输出采用以下格式
movieId movieName, actorName1, actorName2, actorName3....etc.
我已经成功地从两个映射器类中提取了字段。在 reducer 类中,我的代码似乎没有达到我打算作为输出的格式。我得到的输出为
movieId movieName, actorName1
我没有得到演员的其余名字。请查看我的代码并相应地更正我。
public class Join {
public static void main(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Usage: Join <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJobName("Join");
job.setJarByClass(Join.class);
job.setReducerClass(JoinReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, JoinMap1.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, JoinMap2.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class JoinMap1 extends
Mapper<LongWritable, Text, Text, Text> {
private String movieId, movieName, fileTag = "A~ ";
@Override
public void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String values[] = value.toString().split("t");
movieId = values[0].trim();
movieName = values[2].trim().replaceAll("t", "movie Name");
context.write(new Text(movieId), new Text (fileTag + movieName));
}
}
public static class JoinMap2 extends Mapper<LongWritable, Text, Text, Text>{
private String movieId, actorName, fileTag = "B~ ";
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String values[] = line.toString().split("t");
movieId = values[0].trim();
actorName = values[8].trim().replaceAll("t", "actor Name");
context.write(new Text (movieId), new Text (fileTag + actorName));
}
}
public static class JoinReduce extends
Reducer<Text, Text, Text, Text> {
private String movieName, actorName;
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
for (Text value : values){
String currValue = value.toString();
String splitVals[] = currValue.split("~");
if(splitVals[0].equals("A")){
movieName = splitVals[1] != null ? splitVals[1].trim() : "movieName";
} else if (splitVals[0].equals("B")){
actorName= splitVals[1] != null ? splitVals[1].trim() : "actorName";
}
}
context.write(key, new Text (movieName + ", " + actorName));
}
}
}
请告诉我可以做什么,以便我可以实现如上所示的输出。任何帮助将不胜感激。欢迎砖块和蝙蝠。
即使你的代码遍历所有值,它似乎也不会累积演员名称,而是不断用新的角色名称覆盖当前的参与者名称。
取而代之的是:
actorName= splitVals[1] != null ? splitVals[1].trim() : "actorName";
试试这个:
actorName += splitVals[1] != null ? splitVals[1].trim() : "actorName" + ",";
嗨~我刚刚读了扔你的代码。我和格温有同样的建议。如果您希望结果记录具有"电影ID"+"电影名称"+"演员"。您必须同时将所有输出值放入 context.write() 中。所以格温建议的是你必须做的。
我认为工作失败不是Mapreduce的问题,而是HDFS的问题。查看"Hadoop:文件...只能复制到 0 个节点,而不是 1"。
我很好奇的一件事是JoinMap2部分。
String values[] = line.toString().split("t");
movieId = values[0].trim();
actorName = values[8].trim().replaceAll("t", "actor Name");
用"\t"分隔行,因此这意味着任何值[]单元格内不得有"\t"。
所以你真的想让它在第三行做?把"\t"换成"演员名字"?值[8]中没有"\t"。
你至少需要做三件事来完成你的MapReduce作业。
- 修复你的HDFS。
- 重写 JoinMap2 以确保它输出您想要的答案 。 演员。
- 重写减速器,正如格温所说。