MultipleOutputs in hadoop



我在reduce阶段的reduce程序中使用MultipleOutput。我正在处理的数据集大约是270mb,我在我的伪分布式单节点上运行它。我使用了自定义可写的映射输出值。密钥是数据集中存在的国家。

public class reduce_class extends Reducer<Text, name, NullWritable, Text> {
public void reduce(Text key,Iterable<name> values,Context context) throws IOException, InterruptedException{
MultipleOutputs<NullWritable,Text> m = new MultipleOutputs<NullWritable,Text>(context);
long pat;
String n;
NullWritable out = NullWritable.get();
TreeMap<Long,ArrayList<String>> map = new TreeMap<Long,ArrayList<String>>();
for(name nn : values){
pat = nn.patent_No.get();
if(map.containsKey(pat))
map.get(pat).add(nn.getName().toString());
else{
map.put(pat,(new ArrayList<String>()));
map.get(pat).add(nn.getName().toString());}
}
for(Map.Entry entry : map.entrySet()){
n = entry.getKey().toString();
m.write(out, new Text("--------------------------"), key.toString());
m.write(out, new Text(n), key.toString());
ArrayList<String> names = (ArrayList)entry.getValue();
Iterator i = names.iterator();
while(i.hasNext()){
n = (String)i.next();
m.write(out, new Text(n), key.toString());
}
m.write(out, new Text("--------------------------"), key.toString());           
}
m.close();
}

}

以上是我的归约逻辑

问题

1) 上面的代码适用于较小的数据集,但由于270mb数据集的堆空间而失败。

2) 使用country作为键在单个可迭代集合中传递相当大的值。我试图解决这个问题,但MutlipleOutputs为一组给定的键创建了唯一的文件。重点是,我无法附加由之前运行的reduce-and-throws错误创建的现有文件。因此,对于特定的密钥,我必须创建新的文件。有办法解决这个问题吗。解决上述错误导致我将键定义为国家名称(我最终排序的数据),但引发了java堆错误。

样本输入

3858241,"Durand","Philip","E.",",","Hudson","MA","US",",13858241,"Norris","Lonnie","H.",",","Milford","MA","US",",23858242,"Gooding","Elwyn","R.",","120 Darwin Rd.","Pinckney","MI","US","48169",13858243,"Pierron","Claude","Raymond",",","Epinal"3858243,"Jenny","Jean","Paul",",","Decines",","FR","23858243,"Zuccaro","Robert",","3858244,"Mann","Richard","L.",","P.O.Box 69","Woodstock","CT","US","06281",1

小型数据集的样本输出

示例目录结构

CA-r-00000

FR-r-000000

魁北克省-00000

TX-r-000000

500000美元

*单个内容*


3858241Philip E.Durand

Lonnie H.Norris


3858242

Elwyn R.Gooding


3858244

Richard L.Mann


我知道我在这里回答一个非常古老的问题,但无论如何,让我在这里提出一些想法。您似乎正在reduce中创建一个TreeMap,其中包含您在一个reduce调用中获得的所有记录。在Mapreduce中,您无法将所有记录保存在内存中,因为它永远不会缩放。您正在制作patent_no和与该patent_no关联的所有names的映射。您所想要的只是基于patent_no来分离记录,所以为什么不利用mapreduce框架的排序呢。

您应该将patent_noname以及country包含在可写密钥本身中。

  • 只在country的基础上将Partitioner写入分区
  • 排序应在countrypatent_noname
  • 您应该在countrypatent_no时将您的Grouping comparator写入组

因此,具有相同country的所有记录都将进入相同的reducer,并按patent_noname进行排序。在同一个reduce中,不同的patent_no将转到不同的reduce调用。现在,您只需要简单地将其写入MultipleOutput。这样你就可以去掉任何内存中的TreeMap。

我建议你应该注意的几点是:

  • 不要每次都在reduce方法中创建new MultipleOutputs,而是应该编写一个setup()方法,并在setup()方法中只创建一个
  • 不要每次都创建new Text(),而是在setup方法中创建一个,并通过Textset("string")方法重用同一个实例。你可以争辩说,这有什么意义,Java的GC无论如何都会垃圾收集。但是,您应该始终尽量使用尽可能低的内存,这样Java的垃圾回收就应该不那么频繁地被调用

相关内容

  • 没有找到相关文章

最新更新