我试图写一个Map Reduce程序来做两个文本文件之间的连接。我得到的输出,只针对其中一个键。例如,如果我有一个文件R.txt
,数据为
a4 b3
a3 b4
和另一个文件S.txt
,数据为
b3 c3
b3 c1
b3 c2
b4 c4
a4 c2
a4 c1
A4 c3
而如果R.txt
有
b4 c4
, S.txt
有
A3 b4
输出为
a3 c4。
这是我的程序
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class RSJoin{
public static class SMap extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
context.write(new Text(words[0]), new Text("St"+words[1]));
}
}
public static class RMap extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
context.write(new Text(words[1]), new Text("Rt"+words[0]));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) {
String [] parts = val.toString().split("t");
String a=parts[0];
if (a.equals("R")){
for (Text val1 : values){
String [] parts1=val1.toString().split("t");
String b=parts1[0];
if (b.equals("S")){
context.write(new Text(parts[1]), new Text(parts1[1]));
}
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "ReduceJoin");
job.setJarByClass(RSJoin.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(Reduce.class);
MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,RMap.class);
MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,SMap.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.waitForCompletion(true);
}
}
您的联接逻辑假定在值列表中R值位于S值之前。只有当你看到R的时候,你才开始寻找S。Iterable的内部for从外部for结束的地方开始,所以如果S先出现,你的9循环就找不到它。
如果你的多个S值只有一个R值,要么进行二次排序(将"R"one_answers"S"添加到键中,添加分区器和添加分组比较器——这是正确的方法),要么在找到R值时使用一个变量来保存R值,使用一个列表来保存S值,直到找到R值(不太好缩放),并在整个值集中进行一次迭代。
我更改了减速器代码如下,并得到了预期的输出
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
List<String> listR = new ArrayList <String>();
List<String> listS = new ArrayList <String>();
for (Text val : values) {
String [] parts = val.toString().split("t");
String a=parts[0];
if (a.equals("R")){
listR.add(parts[1]);
}
else if (a.equals("S")){
listS.add(parts[1]);
}
}
for (String Temp: listR)
{
for (String Temp1: listS)
{
context.write(new Text(Temp), new Text(Temp1));
}
}
}