卡在减少联接代码中



我有两个数据集。下面给出了两者
第一个数据集

1   A
2   B
3   C
4   D
5   E 


第二个数据集

1   ALPHA
2   BRAVO
3   CHARLIE
4   DELTA
5   ECHO


我想使用reduce-side联接来联接此数据集
最终数据应该显示为这样


A   ALPHA
B   BRAVO
C   CHARLIE
D   DELTA
E   ECHO


我写了以下代码
映射器(从第一个数据集提取数据)

public class indMapper extends Mapper<Object, Text,IntWritable, Text> {
    private String tokens[];
    public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
        tokens=value.toString().split("t");
        context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m1"+"t"+tokens[1].trim()));
    }
}

映射器(从第二个数据集提取数据)

public class AlphaMapper extends Mapper<Object, Text, IntWritable, Text> {
        private String tokens[];
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
            tokens=value.toString().split("t");
            context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m2"+"t"+tokens[1].trim()));
        }
}

还原器(根据需要加入数据)

public class JoinReducer extends Reducer<IntWritable, Text, Text, Text> {
    private String output1=new String();
    private String output2=new String();
    private TreeMap<String,String> x1=new TreeMap<String,String>();
    private String tokens[];
    public void reduce(IntWritable key,Text value,Context context)throws IOException,InterruptedException{
            tokens=value.toString().split("t");
            if(tokens[0].contains("m1"))
            {
                output1=tokens[1];
            }else if(tokens[0].contains("m2"))
            {
                output2=(tokens[1]);
            }
            x1.put(output2, output1);
        cleanup(context);
}
    public void cleanup(Context context)throws IOException,InterruptedException{
        for(Entry y:x1.entrySet())
        {
            context.write(new Text(" "), new Text(y.getKey().toString()+","+y.getValue().toString()));
        }
    }
}

在驾驶员类别中,包含了以下行

MultipleInputs.addInputPath(j, new Path(arg0[0]),TextInputFormat.class,indMapper.class);
MultipleInputs.addInputPath(j, new Path(arg0[1]),TextInputFormat.class,AlphaMapper.class);


我得到了一个类似下面给出的输出,但根本没有达到预期。

1       m1      A
1       m2      ALPHA
2       m2      BRAVO
2       m1      B
3       m1      C
3       m2      CHARLIE
4       m2      DELTA
4       m1      D
5       m1      E
5       m2      ECHO

尽管我还没有将索引包含在context.write()中,但我绝对不明白为什么要打印索引
我甚至使用了cleanup(),仍然得到了相同的结果。请建议如何获得所需的结果,如上所述。

衷心感谢那个让我摆脱困境的人:)

稍后,经过一些修改,我得到了这个输出

m1      E
m1      D
m1      C
m1      B
m1      A
m2      ECHO
m2      DELTA
m2      CHARLIE
m2      BRAVO
m2      ALPHA

修改后的代码可能如下

 public void reduce(IntWritable key,Iterabale<Text> values,Context context)throws IOException,InterruptedException{
               for(Text value : values) {
                tokens=values.toString().split("t");
                if(tokens[0].contains("m1"))
                {
                    output1=tokens[1];
                }else if(tokens[0].contains("m2"))
                {
                    output2=(tokens[1]);
                }
                x1.put(output2, output1);
               }
            cleanup(context);
    }
reducer方法应该有key和Iterable值作为参数。每个缩减器都将具有以下格式的数据

{1,{"m1 A"、"m2 ALPHA"}}、{1、{"m2 BA"、"m2 BRAVO"}。

请重新检查减速器方法的签名。我想,一旦解决了这个问题,如果你的数据是一对一的,你就可以相应地进行映射。如果是一对多,您可能有多个m1或m2,为此,您需要决定如何管理多个值(映射保持为逗号分隔或json或xml字符串),然后输出最终值。

相关内容

  • 没有找到相关文章

最新更新