在map reduce程序中只获取一个键的输出



我试图写一个Map Reduce程序来做两个文本文件之间的连接。我得到的输出,只针对其中一个键。例如,如果我有一个文件R.txt,数据为

a4 b3
a3 b4

和另一个文件S.txt,数据为
b3 c3
b3 c1
b3 c2
b4 c4

我得到了输出

a4 c2
a4 c1
A4 c3

而如果R.txt
b4 c4

S.txt
A3 b4

输出为
a3 c4。

这是我的程序

import java.io.IOException;
import java.util.*;     
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
public class RSJoin{
    public static class SMap extends Mapper<Object, Text, Text, Text>{ 
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] words = value.toString().split(" ");
            context.write(new Text(words[0]), new Text("St"+words[1]));
            }
}
    public static class RMap extends Mapper<Object, Text, Text, Text>{ 
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] words = value.toString().split(" ");
            context.write(new Text(words[1]), new Text("Rt"+words[0]));
            }
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text val : values) {
            String [] parts = val.toString().split("t");
            String a=parts[0];
            if (a.equals("R")){
                for (Text val1 : values){
                String [] parts1=val1.toString().split("t");
                String b=parts1[0];
                if (b.equals("S")){
                    context.write(new Text(parts[1]), new Text(parts1[1]));
                }
                }
            }
        }
  }
}
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    @SuppressWarnings("deprecation")
    Job job = new Job(conf, "ReduceJoin");
    job.setJarByClass(RSJoin.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setReducerClass(Reduce.class);
    MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,RMap.class);
    MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,SMap.class);

    job.setOutputFormatClass(TextOutputFormat.class);
    FileOutputFormat.setOutputPath(job, new Path(args[2]));
    job.waitForCompletion(true);
    }
}

您的联接逻辑假定在值列表中R值位于S值之前。只有当你看到R的时候,你才开始寻找S。Iterable的内部for从外部for结束的地方开始,所以如果S先出现,你的9循环就找不到它。

如果你的多个S值只有一个R值,要么进行二次排序(将"R"one_answers"S"添加到键中,添加分区器和添加分组比较器——这是正确的方法),要么在找到R值时使用一个变量来保存R值,使用一个列表来保存S值,直到找到R值(不太好缩放),并在整个值集中进行一次迭代。

我更改了减速器代码如下,并得到了预期的输出

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    List<String> listR = new ArrayList <String>();
    List<String> listS = new ArrayList <String>();
    for (Text val : values) {
        String [] parts = val.toString().split("t");
        String a=parts[0];
        if (a.equals("R")){
            listR.add(parts[1]);    
        }
        else if (a.equals("S")){
            listS.add(parts[1]);
            }
        }
    for (String Temp: listR)
    {
        for (String Temp1: listS)
        {
            context.write(new Text(Temp), new Text(Temp1));
        }
    }
    }

相关内容

  • 没有找到相关文章

最新更新