Hadoop mapreduce中的详细数据流?



我有点难以理解mapreduce中的数据流。最近,当我的磁盘在减少阶段内存不足时,一项要求很高的作业崩溃了。我发现很难估计我的工作需要多少磁盘。我将详细描述数据流。

如果有人能纠正、详细说明mapreduce中的数据流,或者就确定我的系统的尺寸提供建议,那将是很有帮助的

集群配置:

我有一个集群,包含30个带有的从机

  • 12 GB RAM
  • 100 GB硬盘
  • 4芯

我的映射任务与wordcount非常相似,因此它们只需要很少的内存。我的reduce任务使用单词的排列组。由于需要连接相同的单词,reduce函数需要临时散列映射,该散列映射总是<=3GB。

由于我有12GB的RAM,而我的hadoop守护进程需要1GB的堆+500MB的操作系统,我将map/reduce插槽划分如下:

4个映射插槽和900MB堆

2个减少插槽和3GB堆我的工作有1800个地图任务

,每个任务生成8 GB的地图输出。由于我使用BZIP2进行压缩,因此可以将其压缩到1GB。这意味着映射的总输出将低于2 TB,而我有3 TB的内存。

我选择了100个reduce任务,每个任务产生5 GB的输出。

乍一看,一切都应该记忆深刻。但显然,排序阶段需要压缩和解压缩,而复制阶段需要数据同时位于两个位置(我假设)。所以这就是它变得棘手的地方,这就是为什么我想完全理解数据流。这就是我认为的工作方式,但如果我错了,请纠正我:

数据流

映射任务生成许多溢出(在我的情况下为200),这些溢出在内存中被排序,然后在写入本地磁盘之前被压缩。映射任务完成后,将为我提供200个溢出文件,这些文件被合并每10个(io.sort.factor)。这意味着10个文件被解压缩:10 x(5MB->40MB),因此这会产生0.4GB的压缩/解压缩开销。尽管我不确定200起泄漏事件第一轮合并后会发生什么。我想每个reduce任务都会先洗牌?所以这些文件的大小不会增加太多。如果我们从黑盒的角度来看,这意味着我们从200个压缩溢出开始,最终为reduce任务(每个任务1个)提供了100个压缩文件。

由于我只有60个减速器,现在每个节点有60个压缩文件被复制到减速器,这已经在映射阶段完成了。这可能意味着压缩文件暂时存在于源和目标上。这意味着在这种情况下,每个节点的内存需求(暂时)增加了160个压缩文件,这是映射输出的1.6倍。映射输出为1800 GB因此,我们最终得到了2880 GB,尽管是暂时的。因此,第一个减少阶段应该能够开始,而且确实如此。复制(我希望!)后,数据将从映射程序本地输出目录中删除,因此我们的数据量与映射输出的数据量相同,再次为1800 GB。

现在减速器中的排序阶段开始了。我希望在映射器的内存被清除之前不要开始?!由于要合并1800个地图任务的输出,必须对其进行解压缩。reduce任务的输入大约是mapoutput/100=18 GB的压缩数据。现在是如何解压缩的,它不能一次全部解压缩,因为那时我每个节点有144GB,而且由于我的工作没有崩溃,所以解压缩的执行稍微明智一些。我的想法与映射阶段相同:10个文件(1800个任务输出)被解压缩并同时合并。然后,解压缩将给出18GB/180=100MB的每个合并回合的开销。问题再次是最后一轮合并是如何进行的,我记得在hadoop引用中读到,在只剩下一个文件之前,reducers不会继续合并。

在减少阶段中排序之后,减少阶段运行,这需要对输入记录进行解压缩,但由于每个减少任务都使用500个输入密钥组,因此这应该不是真正的问题。

如前所述,reduce任务向DFS产生大约5GB的输出(总计0.5TB)。

在前60个减少任务完成后,这项工作真的会遇到麻烦。在第二轮中,任务在排序阶段开始崩溃,这让我认为这与复制开销或解压缩开销有关。

确切的例外是:org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3

我希望我充分详细地解释了我的程序流程以及我对mapreduce的理解。如果

  • 有人可以清除有关复制阶段和合并阶段的烟雾
  • 以及为克服就业危机提供建议
  • 对我来说,能够准确估计我需要多少内存是理想的,因为如果我尝试一个具有40个节点的集群在运行5天后崩溃(就像这次经历的那样),那将是不愉快的,因为截止日期越来越近了

提前感谢

我工作失败的原因如下:

异常1:

org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:510)
at org.apache.hadoop.mapred.Merger.merge(Merger.java:142)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2539)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:661)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)

异常2:

FAILEDjava.io.IOException: Task: attempt_201310160819_0001_r_000075_1 - The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/map_1622.out
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
at org.apache.hadoop.mapred.MapOutputFile.getInputFileForWrite(MapOutputFile.java:176)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2798)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2762)

异常3:(可能由磁盘检查器异常引起)

Task attempt_201310160819_0001_r_000077_1 failed to report status for 2400 seconds. Killing!

我刚收到Praveen Sripati的一封电子邮件,提到hadoop引用,我会把它粘贴到这里:

在复制阶段,map和reduce任务中都存在数据吗?何时清除地图输出

以下内容来自Hadoop-最终指南

主机不会在第一个减速器之后立即从磁盘中删除映射输出已检索到它们,因为减速器可能随后出现故障。相反他们一直等到jobtracker告诉他们删除他们(或者应用程序主机),这是在作业已经完成之后。

这一点非常重要,映射输出保留在磁盘上!!对我来说有点不幸。

5)然后减速器中的合并开始。不完全确定是怎么做的。它是否会合并为每个reduce键一个文件?还是将所有内容合并为一项任务

再次来自同一本书

复制完所有映射输出后,reduce任务进入排序阶段(应该适当地称为合并阶段,如排序是在地图一侧进行的),它合并了地图输出,保持其排序顺序。这是分回合进行的。对于例如,如果有50个映射输出,并且合并因子为10(默认值,由io.sort.factor属性控制,就像地图的合并),将有五轮。每轮合并10文件合并为一个,所以最后会有五个中间文件。

谢谢,Praveen

这意味着合并后的文件数量限制为io.sort.factor。在我的情况下,有10个段,每个段1.8GB。在上一次合并期间,所有内容都必须解压缩,因此每轮需要1.8*10 GB=18 GB。

相关内容

  • 没有找到相关文章

最新更新