我有两个数据集。下面给出了两者
第一个数据集
1 A
2 B
3 C
4 D
5 E
第二个数据集
1 ALPHA
2 BRAVO
3 CHARLIE
4 DELTA
5 ECHO
我想使用reduce-side联接来联接此数据集
最终数据应该显示为这样
A ALPHA
B BRAVO
C CHARLIE
D DELTA
E ECHO
我写了以下代码
映射器(从第一个数据集提取数据)
public class indMapper extends Mapper<Object, Text,IntWritable, Text> {
private String tokens[];
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("t");
context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m1"+"t"+tokens[1].trim()));
}
}
映射器(从第二个数据集提取数据)
public class AlphaMapper extends Mapper<Object, Text, IntWritable, Text> {
private String tokens[];
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("t");
context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m2"+"t"+tokens[1].trim()));
}
}
还原器(根据需要加入数据)
public class JoinReducer extends Reducer<IntWritable, Text, Text, Text> {
private String output1=new String();
private String output2=new String();
private TreeMap<String,String> x1=new TreeMap<String,String>();
private String tokens[];
public void reduce(IntWritable key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("t");
if(tokens[0].contains("m1"))
{
output1=tokens[1];
}else if(tokens[0].contains("m2"))
{
output2=(tokens[1]);
}
x1.put(output2, output1);
cleanup(context);
}
public void cleanup(Context context)throws IOException,InterruptedException{
for(Entry y:x1.entrySet())
{
context.write(new Text(" "), new Text(y.getKey().toString()+","+y.getValue().toString()));
}
}
}
在驾驶员类别中,包含了以下行
MultipleInputs.addInputPath(j, new Path(arg0[0]),TextInputFormat.class,indMapper.class);
MultipleInputs.addInputPath(j, new Path(arg0[1]),TextInputFormat.class,AlphaMapper.class);
我得到了一个类似下面给出的输出,但根本没有达到预期。
1 m1 A
1 m2 ALPHA
2 m2 BRAVO
2 m1 B
3 m1 C
3 m2 CHARLIE
4 m2 DELTA
4 m1 D
5 m1 E
5 m2 ECHO
尽管我还没有将索引包含在context.write()
中,但我绝对不明白为什么要打印索引
我甚至使用了cleanup(),仍然得到了相同的结果。请建议如何获得所需的结果,如上所述。
衷心感谢那个让我摆脱困境的人:)
稍后,经过一些修改,我得到了这个输出
m1 E
m1 D
m1 C
m1 B
m1 A
m2 ECHO
m2 DELTA
m2 CHARLIE
m2 BRAVO
m2 ALPHA
修改后的代码可能如下
public void reduce(IntWritable key,Iterabale<Text> values,Context context)throws IOException,InterruptedException{
for(Text value : values) {
tokens=values.toString().split("t");
if(tokens[0].contains("m1"))
{
output1=tokens[1];
}else if(tokens[0].contains("m2"))
{
output2=(tokens[1]);
}
x1.put(output2, output1);
}
cleanup(context);
}
{1,{"m1 A"、"m2 ALPHA"}}、{1、{"m2 BA"、"m2 BRAVO"}。
请重新检查减速器方法的签名。我想,一旦解决了这个问题,如果你的数据是一对一的,你就可以相应地进行映射。如果是一对多,您可能有多个m1或m2,为此,您需要决定如何管理多个值(映射保持为逗号分隔或json或xml字符串),然后输出最终值。