合并MapReduce作业的输出文件



我用Python编写了一个Mapper和Reducer,并使用Hadoop流在亚马逊的Elastic MapReduce(EMR)上成功执行了它。

最终结果文件夹包含三个不同文件part-000000、part-00001和part-00002中的输出。但我需要将输出作为一个单独的文件。有什么办法我能做到吗?

这是我的Mapper代码:

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '%st%s' % (word, 1)

这是我的减缩代码

#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
max_count=0
for line in sys.stdin:
    line = line.strip()
    word, count = line.split('t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
if current_word == word:
    current_count += count
else:
    if current_word:
        # write result to STDOUT
            if current_word[0] != '@':
                print '%st%d' % (current_word, current_count)
                if count > max_count:
                    max_count = count
    current_count = count
    current_word = word
if current_word == word:
    print '%st%s' % (current_word, current_count)

我需要将其作为一个单独的文件输出。

一种非常简单的方法(假设Linux/UNIX系统):

$ cat part-00000 part-00001 part-00002 > output

对小数据集/处理使用单个reduce,或对作业的输出文件使用getmerge选项。

我对上述问题的解决方案是执行以下hdfs命令:

hadoop fs -getmerge /hdfs/path local_file

其中/hdfs/path是包含作业输出的所有部分(part-*****)的路径。hadoopfs的-getmerge选项将把所有作业输出合并到本地文件系统上的一个文件中。

我最近遇到了同样的问题,实际上组合器应该完成这项任务,但我无法以某种方式实现。我所做的是;

  1. 步骤1:映射器1.py还原器1.py

    输入:s3:///数据/

    输出s3:///小输出/

  2. 步骤2:映射器2.py还原器2.py

    输入s3:///数据/

    输出:s3:///输出2/

  3. 步骤3:映射器3.py还原器3.py

    输入:s3:///输出2/

    输出:s3:///最终输出/

我假设我们需要步骤1的输出作为步骤3的单个文件。

在mapper2.py的顶部,有这样的代码;

if not os.path.isfile('/tmp/s3_sync_flag'):
    os.system('touch /tmp/s3_sync_flag')
    [download files to /tmp/output/]
    os.system('cat /tmp/output/part* > /tmp/output/all')

if块,检查多个映射程序的执行情况。

相关内容

  • 没有找到相关文章