我在reduce阶段的reduce程序中使用MultipleOutput。我正在处理的数据集大约是270mb,我在我的伪分布式单节点上运行它。我使用了自定义可写的映射输出值。密钥是数据集中存在的国家。
public class reduce_class extends Reducer<Text, name, NullWritable, Text> {
public void reduce(Text key,Iterable<name> values,Context context) throws IOException, InterruptedException{
MultipleOutputs<NullWritable,Text> m = new MultipleOutputs<NullWritable,Text>(context);
long pat;
String n;
NullWritable out = NullWritable.get();
TreeMap<Long,ArrayList<String>> map = new TreeMap<Long,ArrayList<String>>();
for(name nn : values){
pat = nn.patent_No.get();
if(map.containsKey(pat))
map.get(pat).add(nn.getName().toString());
else{
map.put(pat,(new ArrayList<String>()));
map.get(pat).add(nn.getName().toString());}
}
for(Map.Entry entry : map.entrySet()){
n = entry.getKey().toString();
m.write(out, new Text("--------------------------"), key.toString());
m.write(out, new Text(n), key.toString());
ArrayList<String> names = (ArrayList)entry.getValue();
Iterator i = names.iterator();
while(i.hasNext()){
n = (String)i.next();
m.write(out, new Text(n), key.toString());
}
m.write(out, new Text("--------------------------"), key.toString());
}
m.close();
}
}
以上是我的归约逻辑
问题
1) 上面的代码适用于较小的数据集,但由于270mb数据集的堆空间而失败。
2) 使用country作为键在单个可迭代集合中传递相当大的值。我试图解决这个问题,但MutlipleOutputs为一组给定的键创建了唯一的文件。重点是,我无法附加由之前运行的reduce-and-throws错误创建的现有文件。因此,对于特定的密钥,我必须创建新的文件。有办法解决这个问题吗。解决上述错误导致我将键定义为国家名称(我最终排序的数据),但引发了java堆错误。
样本输入
3858241,"Durand","Philip","E.",",","Hudson","MA","US",",13858241,"Norris","Lonnie","H.",",","Milford","MA","US",",23858242,"Gooding","Elwyn","R.",","120 Darwin Rd.","Pinckney","MI","US","48169",13858243,"Pierron","Claude","Raymond",",","Epinal"3858243,"Jenny","Jean","Paul",",","Decines",","FR","23858243,"Zuccaro","Robert",","3858244,"Mann","Richard","L.",","P.O.Box 69","Woodstock","CT","US","06281",1
小型数据集的样本输出
示例目录结构
CA-r-00000
FR-r-000000
魁北克省-00000
TX-r-000000
500000美元
*单个内容*
3858241Philip E.Durand
Lonnie H.Norris
3858242
Elwyn R.Gooding
3858244
Richard L.Mann
我知道我在这里回答一个非常古老的问题,但无论如何,让我在这里提出一些想法。您似乎正在reduce中创建一个TreeMap,其中包含您在一个reduce调用中获得的所有记录。在Mapreduce中,您无法将所有记录保存在内存中,因为它永远不会缩放。您正在制作patent_no
和与该patent_no
关联的所有names
的映射。您所想要的只是基于patent_no
来分离记录,所以为什么不利用mapreduce框架的排序呢。
您应该将patent_no
和name
以及country
包含在可写密钥本身中。
- 只在
country
的基础上将Partitioner
写入分区 - 排序应在
country
、patent_no
、name
上 - 您应该在
country
、patent_no
时将您的Grouping comparator
写入组
因此,具有相同country
的所有记录都将进入相同的reducer,并按patent_no
和name
进行排序。在同一个reduce中,不同的patent_no将转到不同的reduce调用。现在,您只需要简单地将其写入MultipleOutput。这样你就可以去掉任何内存中的TreeMap。
我建议你应该注意的几点是:
- 不要每次都在reduce方法中创建
new MultipleOutputs
,而是应该编写一个setup()
方法,并在setup()
方法中只创建一个 - 不要每次都创建
new Text()
,而是在setup方法中创建一个,并通过Text
的set("string")
方法重用同一个实例。你可以争辩说,这有什么意义,Java的GC无论如何都会垃圾收集。但是,您应该始终尽量使用尽可能低的内存,这样Java的垃圾回收就应该不那么频繁地被调用