在hadoop中处理大文件时,Shuffle,合并和fetcher错误



我正在运行一个类似单词计数的mapreduce作业,处理200个文件,每个文件1Gb。我在包含4个数据节点(每个数据节点2个cpu)的hadoop集群上运行作业,具有8Gb内存和约200G空间。我已经尝试了各种配置选项,但每次我的工作失败,无论是InMemory Shuffle, OnDisk Shuffle, InMemory合并,OnDisk合并,或Fetcher错误。

映射器输出的大小与输入文件的大小相当,因此,为了最小化映射器输出的大小,我对mapreduce输出使用了BZip2压缩。然而,即使有一个压缩的地图输出,我仍然得到错误在减速器阶段。我用了4个减速器。因此,我尝试了hadoop集群的各种配置:

集群的标准配置为:

    Default virtual memory for a job's map-task      3328 Mb
    Default virtual memory for a job's reduce-task  6656 Mb
    Map-side sort buffer memory 205 Mb
    Mapreduce Log Dir Prefix    /var/log/hadoop-mapreduce
    Mapreduce PID Dir Prefix    /var/run/hadoop-mapreduce
    yarn.app.mapreduce.am.resource.mb   6656
    mapreduce.admin.map.child.java.opts -Djava.net.preferIPv4Stack=TRUE -Dhadoop.metrics.log.level=WARN
    mapreduce.admin.reduce.child.java.opts  -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN
    mapreduce.admin.user.env LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/`$JAVA_HOME/bin/java -d32 -version &> /dev/null;if [ $? -eq 0 ]; then echo Linux-i386-32; else echo Linux-amd64-64;fi`
    mapreduce.am.max-attempts   2
    mapreduce.application.classpath $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*
    mapreduce.cluster.administrators    hadoop
    mapreduce.framework.name    yarn
 mapreduce.job.reduce.slowstart.completedmaps   0.05
    mapreduce.jobhistory.address    ip-XXXX.compute.internal:10020
    mapreduce.jobhistory.done-dir   /mr-history/done
    mapreduce.jobhistory.intermediate-done-dir  /mr-history/tmp
    mapreduce.jobhistory.webapp.address ip-XXXX.compute.internal:19888
    mapreduce.map.java.opts -Xmx2662m
    mapreduce.map.log.level INFO
    mapreduce.map.output.compress   true
    mapreduce.map.sort.spill.percent    0.7
    mapreduce.map.speculative   false
    mapreduce.output.fileoutputformat.compress  true
    mapreduce.output.fileoutputformat.compress.type BLOCK
    mapreduce.reduce.input.buffer.percent   0.0
    mapreduce.reduce.java.opts  -Xmx5325m
    mapreduce.reduce.log.level  INFO
    mapreduce.reduce.shuffle.input.buffer.percent 0.7
    mapreduce.reduce.shuffle.merge.percent  0.66
    mapreduce.reduce.shuffle.parallelcopies 30
    mapreduce.reduce.speculative    false
    mapreduce.shuffle.port  13562
    mapreduce.task.io.sort.factor   100
    mapreduce.task.timeout  300000
    yarn.app.mapreduce.am.admin-command-opts    -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN
    yarn.app.mapreduce.am.command-opts  -Xmx5325m
    yarn.app.mapreduce.am.log.level INFO
    yarn.app.mapreduce.am.staging-dir   /user
    mapreduce.map.maxattempts       4
    mapreduce.reduce.maxattempts        4

这个配置给了我以下错误:

14/05/16 20:20:05 INFO mapreduce.Job:  map 20% reduce 3%
14/05/16 20:27:13 INFO mapreduce.Job:  map 20% reduce 0%
14/05/16 20:27:13 INFO mapreduce.Job: Task Id : attempt_1399989158376_0049_r_000000_0,      Status : FAILED
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in InMemoryMerger - Thread to merge in-memory shuffled map-outputs
    at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
 Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/attempt_1399989158376_0049_r_000000_0/map_2038.out
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
    at org.apache.hadoop.mapred.YarnOutputFiles.getInputFileForWrite(YarnOutputFiles.java:213)
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl$InMemoryMerger.merge(MergeManagerImpl.java:450)
    at org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:94)

然后我尝试改变各种选项,在洗牌阶段跳跃以减少负载,但我得到了相同的错误。

mapreduce.reduce.shuffle.parallelcopies     5
mapreduce.task.io.sort.factor   10

mapreduce.reduce.shuffle.parallelcopies     10
mapreduce.task.io.sort.factor   20

然后我意识到我的数据节点上的tmp文件不存在,因此所有的合并和改组都发生在内存中。因此,我在每个datanode上手动添加。我保留了初始配置,但增加了减速机启动前的时间延迟,以限制datanode上的负载。

mapreduce.job.reduce.slowstart.completedmaps 0.7

我也试过增加io.sort.mb:

mapreduce.task.io.sort.mb from 205 to 512. 

但是现在我得到以下onDisk错误:

14/05/26 12:17:08 INFO mapreduce.Job:  map 62% reduce 21%
14/05/26 12:20:13 INFO mapreduce.Job: Task Id : attempt_1400958508328_0021_r_000000_0, Status : FAILED
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in OnDiskMerger - Thread to merge on-disk map-outputs
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for hadoop/yarn/local/usercache/eoc21/appcache/application_1400958508328_0021/output/attempt_1400958508328_0021_r_000000_0/map_590.out
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl$OnDiskMerger.merge(MergeManagerImpl.java:536)
at org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:94)

减速机下降到0%,当它回到17%时,我得到以下错误:

14/05/26 12:32:03 INFO mapreduce.Job: Task Id : attempt_1400958508328_0021_r_000000_1, Status : FAILED
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#22
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/attempt_1400958508328_0021_r_000000_1/map_1015.out
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
at org.apache.hadoop.mapred.YarnOutputFiles.getInputFileForWrite(YarnOutputFiles.java:213)
at org.apache.hadoop.mapreduce.task.reduce.OnDiskMapOutput.<init>(OnDiskMapOutput.java:61)
at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:257)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)

我四处阅读,似乎"找不到任何有效的本地目录输出/attempt_1400958508328_0021_r_000000_1/map_1015。"Out"与节点上没有足够的空间进行泄漏相关。然而,我检查了数据节点,似乎有足够的空间:

Filesystem      Size  Used Avail Use% Mounted on
/dev/xvde1       40G   22G   18G  56% /
none            3.6G     0  3.6G   0% /dev/shm
/dev/xvdj      1008G  758G  199G  80% /hadoop/hdfs/data

所以不知道该尝试什么了。集群是否太小而无法处理此类作业?我是否需要更多的数据节点空间?是否有一种方法可以为hadoop上的作业找到最佳配置?任何建议都是高度赞赏的!

这可能是我所知道的四件事之一,最有可能是您在关于磁盘空间的问题中提出的观点,或类似的问题- inodes:

  • 文件被另一个进程删除(不太可能,除非你记得自己这样做过)
  • 磁盘错误(不太可能)
  • 磁盘空间不足
  • 节点数不足(运行df -i)

即使您在作业之前/之后运行df -hdf -i,您也不知道在作业期间被吃掉和清除了多少。因此,当您的作业正在运行时,建议观察这些数字/将它们记录到文件/绘制它们/等等。例如

watch "df -h && df -i"

您需要指定一些临时目录来存储中间映射并减少输出。可能是您没有指定任何临时目录,因此它无法找到任何有效的目录来存储中间数据。您可以编辑mapred-site.xml

<property>
  <name>mapred.local.dir</name>
  <value>/temp1,/temp2,/temp3</value>
</property>

写入临时MapReduce数据的本地文件系统上以逗号分隔的路径列表。多路径扩展磁盘i/o

指定这些临时目录后,它将存储中间映射并通过以下任何一种方式选择临时目录来减少输出

random:在这种情况下,reduce任务的中间数据存储在随机选择的数据位置。

max:在这种情况下,reduce任务的中间数据存储在可用空间最大的数据位置。

roundrobin:在这种情况下,映射器和reducer通过轮询调度来选择磁盘,以便在本地磁盘数量的范围内在作业级别存储中间数据。作业ID用于在本地磁盘上创建唯一的子目录,用于存储每个作业的中间数据。

您可以在mapred-site.xml中设置此属性示例

<property>
  <name>mapreduce.job.local.dir.locator</name>
  <value>max</value>
</property>

hadoop默认为roundbin

mapreduce.cluster.local.dir"(旧的已弃用名称:mapred.local.dir)在mapred-site.xml中指定。

相关内容

  • 没有找到相关文章

最新更新