我正在执行map reduce中的连接操作。我屈服于用分隔符(逗号)分隔值的两个文件。我可以通过在一个公共实体上执行连接操作,将两个输入文件中的输出放在一个文件中。
下面是map reduce代码:
public class EmpMapReduce {
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String tokens [] = value.toString().split(",");
String empid = tokens[0];
String val = "";
if(tokens.length != 0)
{
for (int cnt = 1; cnt < tokens.length; cnt++)
{
val = val + tokens[cnt] + "t";
}
}
context.write(new Text(empid), new Text(val));
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException
{
String str = "";
for (Text val : values)
{
str = str + val.toString() + "t";
}
context.write(key, new Text (str));
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: EmpMapReduce <in1> <in2> <out>");
System.exit(2);
}
Job job = new Job(conf, "EmpMapReduce");
job.setJarByClass(EmpMapReduce.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
以下是我使用的两个输入文件内容:
100,name100,10
101,name101,11
102,name102,12
103,name103,13
104,name104,14
105,name105,15
106,name106,16
107,name107,17
第二个输入文件:
100,100000
101,200000
102,300000
103,400000
104,500000
105,600000
106,700000
107,800000
我得到如下输出:
100,name100,10,100000
101,200000,name101,11
102,name102,12,300000
103,400000,name103,13
104,name104,14,500000
105,600000,name105,15
106,name106,16,700000
107,800000,name107,17
现在我关心的是为什么我得到这样的输出:
100,name100,10,100000
101,200000,name101,11
第一行数据中的首先从一个输入文件复制,然后从另一个输入文件复制。但是第二行是相反的。我想不出怎样才能使每一行数据的顺序保持一致。
另一个问题是:一旦我在所有行中以特定顺序获得数据,我就可以执行各种操作,例如:替换name100 -> somenewname或在每行末尾添加新的逗号分隔值,该值等于该行先前所有值的总和
两个映射器的输出到达reducer的顺序未指定。所以你需要一些方法在减速机中识别它们。
一个简单的解决方案是:
- 有两个映射器,每个输入 一个映射器
- 每个映射器输出"[type]:[rest of value]"的值
- 假设你有两种类型(用户,事务),现在每一种都被识别。
- 现在在您的减速器(抱歉伪代码):
void reduce(..) {
String user = "";
String trans = "";
for(value: values) {
(type, payload) = value.split();
if (type == "user") user = payload;
if (type == "transaction") transaction = payload;
}
context.write(user + "t" + transaction);
}
对于Matthew的解决方案,您可能需要将此放入循环中以等待设置的所有值以获得正确的结果:
if(!user.equals("") && !trans.equals("")){
str = str + user+ "t" + trans+ "t";
}