我有三个MapReduce作业,它们生成以制表符分隔的文件,对相同的文件进行操作。第一个值是关键。这三个MR作业的每一个输出都是如此。
我现在想做的是使用MapReduce通过键将这些文件"缝合"在一起。什么是最佳的Mapper输出和Reducer输入?我尝试使用ArrayWritable,但由于无序排列,对于一些记录,ArrayWrutable from 1文件位于第三个位置,而不是第二个位置。
我想要这个:
Key t Values-from-first-MR-job t Values-from-second-MR-job t Values-from-third-MR-job
对于所有记录,这应该是相同的。但是,正如我所说,由于洗牌,有时这种情况会发生在一些唱片上:
Key t Values-from-third-MR-job t Values-from-first-MR-job t Values-from-second-MR-job
我应该如何设置我的Mapper和Reducer来解决这个问题?
由于只涉及三种类型的文件,因此可以对发出的值进行简单标记。在地图中提取分割的路径,识别其位置并为值添加合适的前缀。为了清楚起见,假设输出在3个目录中:
- 路径1/mr_out_1
- 路径2/mr_out_2
- 路径3/mr_out_3
对所有这些路径使用TextInputForamt
,在map
中,您将执行:
String[] keyVal = value.spilt("t",2);
Path filePath = ((FileSplit) context.getInputSplit()).getPath();
String dirName = filePath.getParent().getName().toString();
Text outValue = new Text();
if(dirName.equals("mr_out_1")){
outValue.set("1_" + keyVal[1]);
} else if(dirName.equals("mr_out_2")){
outValue.set("2_" + keyVal[1]);
} else {
outValue.set("3_" + keyVal[1]);
}
context.write(new Text(keyVal[0]), outVal);
如果所有文件都在同一目录中,请使用fileName而不是dirName。然后根据名称识别标志(正则表达式匹配可能合适):
String fileName = filePath.getName().toString();
if(fileName.matches("regex")){ ... }
在reduce
中,只需将传入的值放入List并排序。Rest就足够简单了。
List<String> list = new ArrayList<String>(3);
for(Text v : values){
list.add(v.toString());
}
Collections.sort(list);
StringBuilder builder = new StringBuilder();
for(String s : list){
builder.append(s.substring(2)+"t");
}
context.write(key, new Text(builder.toString().trim()));
我认为这将达到目的。请记住,如果文件超过9个(按字母顺序),Collection.sort
策略将失败。然后可以单独提取标记,将其强制转换为Integer
,并使用TreeMap<tag, actualString>
进行排序。
注:以上所有的代码片段都在使用新的API。我没有使用IDE来编写这些,所以可能很少存在语法错误。再说一次,我在写作中没有遵循适当的惯例。假设map
的outKey可以是类成员,并且使用outKey.set(keyVal[0])
可以消除Text
对象创建开销。