我在HDFS中有一个目录(最终目录),其中一些文件(例如:10 mb)每分钟加载一次。过了一段时间,我想把所有的小文件合并成一个大文件(例如:100 mb)。但是用户不断地将文件推到最终目录。这是一个连续的过程。
因此,我第一次需要将前10个文件合并为一个大文件(例如:large.txt)并将文件保存到Finaldir。
现在我的问题是我将如何得到接下来的10个文件,不包括前10个文件?
有人能帮我一下吗
这是另一种选择,这仍然是@Andrew在他的评论中指出的传统方法,但是增加了额外的步骤,使您的输入文件夹作为缓冲区来接收小文件,及时将它们推入tmp目录并合并它们并将结果推回输入。
步骤1:创建TMP目录
hadoop fs -mkdir tmp
步骤2:将所有小文件移动到TMP目录
hadoop fs -mv input/*.txt tmp
步骤3 -在hadoop-streaming jar的帮助下合并小文件
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar
-Dmapred.reduce.tasks=1
-input "/user/abc/input"
-output "/user/abc/output"
-mapper cat
-reducer cat
步骤4-将输出移动到输入文件夹
hadoop fs -mv output/part-00000 input/large_file.txt
步骤5 -删除输出
hadoop fs -rm -R output/
步骤6 -删除所有文件从tmp
hadoop fs -rm tmp/*.txt
从步骤2到步骤6创建一个shell脚本,并安排它定期运行,以定期合并较小的文件(根据您的需要,可以每分钟一次)
为合并小文件调度cron作业的步骤
步骤1:使用上述步骤(2到6)创建shell脚本/home/abc/mergejob.sh
重要提示:你需要在脚本中指定hadoop的绝对路径,以便cron能够理解
#!/bin/bash
/home/abc/hadoop-2.6.0/bin/hadoop fs -mv input/*.txt tmp
wait
/home/abc/hadoop-2.6.0/bin/hadoop jar /home/abc/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar
-Dmapred.reduce.tasks=1
-input "/user/abc/input"
-output "/user/abc/output"
-mapper cat
-reducer cat
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -mv output/part-00000 input/large_file.txt
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -rm -R output/
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -rm tmp/*.txt
第2步:使用cron表达式安排脚本每分钟运行一次
a)通过选择编辑器编辑crontab
>crontab -e
b)在末尾添加以下行,并退出编辑器
* * * * * /bin/bash /home/abc/mergejob.sh > /dev/null 2>&1
合并作业将计划每分钟运行一次。
希望对大家有帮助。
@Andrew指出,在6年前面向批处理的世界中,是合适的解决方案。
但现在是2016年,你有一个微批数据流正在运行,需要一个非阻塞解决方案。
我就是这么做的:
- 创建一个带有3个分区的EXTERNAL表,映射到3个目录上如
new_data
、reorg
、history
- 将新文件输入
new_data
- 实现一个作业来运行批处理压缩,并定期运行
现在批量压缩逻辑:
- 确保在压缩运行时不执行SELECT查询,否则将返回重复项
- 选择适合压缩的所有文件(自定义)和将从
new_data
目录移动到reorg
-
合并所有这些
reorg
文件的内容,到history
目录下的新文件(随时随地GZip它,Hive会识别.gz
扩展名) -
删除
reorg
中的文件
所以它基本上是旧的2010故事,除了您现有的数据流可以继续转储新文件到new_data
,而压缩是安全地在单独的目录中运行。在压缩作业崩溃的情况下,您可以安全地调查/清理/恢复压缩,而不会影响数据流。
顺便说一下,我不太喜欢2010年基于"Hadoop Streaming"任务的解决方案——一方面,"Streaming"现在有了非常不同的含义;另一方面,"Hadoop流"在过去很有用,但现在已经淡出了人们的视野;在紧握的手[*]上,你可以很简单地使用Hive查询,例如:
INSERT INTO TABLE blahblah PARTITION (stage='history')
SELECT a, b, c, d
FROM blahblah
WHERE stage='reorg'
;
在查询之前使用一对SET some.property = somevalue
,您可以定义将在结果文件上应用哪种压缩编解码器,您想要多少文件(或者更准确地说,您想要的文件有多大- Hive将相应地运行合并),等等。
查看https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties下的hive.merge.mapfiles
和hive.merge.mapredfiles
(或hive.merge.tezfiles
,如果你使用TEZ)和hive.merge.smallfiles.avgsize
,然后hive.exec.compress.output
和mapreduce.output.fileoutputformat.compress.codec
-加上hive.hadoop.supports.splittable.combineinputformat
,以减少映射容器的数量,因为你的输入文件相当小。
[*] 非常古老的科幻参考: -)