合并hadoop中的小文件



我在HDFS中有一个目录(最终目录),其中一些文件(例如:10 mb)每分钟加载一次。过了一段时间,我想把所有的小文件合并成一个大文件(例如:100 mb)。但是用户不断地将文件推到最终目录。这是一个连续的过程。

因此,我第一次需要将前10个文件合并为一个大文件(例如:large.txt)并将文件保存到Finaldir。

现在我的问题是我将如何得到接下来的10个文件,不包括前10个文件?

有人能帮我一下吗

这是另一种选择,这仍然是@Andrew在他的评论中指出的传统方法,但是增加了额外的步骤,使您的输入文件夹作为缓冲区来接收小文件,及时将它们推入tmp目录并合并它们并将结果推回输入。

步骤1:创建TMP目录

hadoop fs -mkdir tmp

步骤2:将所有小文件移动到TMP目录

hadoop fs -mv input/*.txt tmp

步骤3 -在hadoop-streaming jar的帮助下合并小文件

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar 
                   -Dmapred.reduce.tasks=1 
                   -input "/user/abc/input" 
                   -output "/user/abc/output" 
                   -mapper cat 
                   -reducer cat

步骤4-将输出移动到输入文件夹

hadoop fs -mv output/part-00000 input/large_file.txt

步骤5 -删除输出

 hadoop fs -rm -R output/

步骤6 -删除所有文件从tmp

hadoop fs -rm tmp/*.txt

从步骤2到步骤6创建一个shell脚本,并安排它定期运行,以定期合并较小的文件(根据您的需要,可以每分钟一次)

为合并小文件调度cron作业的步骤

步骤1:使用上述步骤(2到6)创建shell脚本/home/abc/mergejob.sh

重要提示:你需要在脚本中指定hadoop的绝对路径,以便cron能够理解

#!/bin/bash
/home/abc/hadoop-2.6.0/bin/hadoop fs -mv input/*.txt tmp
wait
/home/abc/hadoop-2.6.0/bin/hadoop jar /home/abc/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar 
                   -Dmapred.reduce.tasks=1 
                   -input "/user/abc/input" 
                   -output "/user/abc/output" 
                   -mapper cat 
                   -reducer cat
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -mv output/part-00000 input/large_file.txt
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -rm -R output/
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -rm tmp/*.txt

第2步:使用cron表达式安排脚本每分钟运行一次

a)通过选择编辑器编辑crontab

>crontab -e

b)在末尾添加以下行,并退出编辑器

* * * * * /bin/bash /home/abc/mergejob.sh > /dev/null 2>&1

合并作业将计划每分钟运行一次。

希望对大家有帮助。

@Andrew指出,在6年前面向批处理的世界中,合适的解决方案。
但现在是2016年,你有一个微批数据流正在运行,需要一个非阻塞解决方案。

我就是这么做的:

  • 创建一个带有3个分区的EXTERNAL表,映射到3个目录上如new_datareorghistory
  • 将新文件输入new_data
  • 实现一个作业来运行批处理压缩,并定期运行

现在批量压缩逻辑:

  1. 确保在压缩运行时不执行SELECT查询,否则将返回重复项
  2. 选择适合压缩的所有文件(自定义)new_data目录移动到reorg
  3. 合并所有这些reorg文件的内容,到history目录下的新文件(随时随地GZip它,Hive会识别.gz扩展名)
  4. 删除reorg
  5. 中的文件

所以它基本上是旧的2010故事,除了您现有的数据流可以继续转储新文件到new_data,而压缩是安全地在单独的目录中运行。在压缩作业崩溃的情况下,您可以安全地调查/清理/恢复压缩,而不会影响数据流。


顺便说一下,我不太喜欢2010年基于"Hadoop Streaming"任务的解决方案——一方面,"Streaming"现在有了非常不同的含义;另一方面,"Hadoop流"在过去很有用,但现在已经淡出了人们的视野;在紧握的手[*]上,你可以很简单地使用Hive查询,例如:

INSERT INTO TABLE blahblah PARTITION (stage='history')
SELECT a, b, c, d
FROM blahblah
WHERE stage='reorg'
;

在查询之前使用一对SET some.property = somevalue,您可以定义将在结果文件上应用哪种压缩编解码器,您想要多少文件(或者更准确地说,您想要的文件有多大- Hive将相应地运行合并),等等。

查看https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties下的hive.merge.mapfileshive.merge.mapredfiles(或hive.merge.tezfiles,如果你使用TEZ)和hive.merge.smallfiles.avgsize,然后hive.exec.compress.outputmapreduce.output.fileoutputformat.compress.codec -加上hive.hadoop.supports.splittable.combineinputformat,以减少映射容器的数量,因为你的输入文件相当小。


[*] 非常古老的科幻参考: -)

相关内容

  • 没有找到相关文章

最新更新