Python中经典Hadoop字数示例中的数据流



我正在尝试理解Python中的Hadoop字数示例http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

作者从映射器和reducer的天真版本开始。这是减速器(为了简洁起见,我删除了一些注释)

#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
    line = line.strip()
    word, count = line.split('t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%st%s' % (current_word, current_count)
        current_count = count
        current_word = word
# do not forget to output the last word if needed!
if current_word == word:
    print '%st%s' % (current_word, current_count)

作者用测试程序

echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py

因此,编写reducer时,就好像reducer作业的输入数据是这样的:

aa 1
aa 1
bb 1
cc 1
cc 1
cc 1

我对reducer最初的理解是,给定reducer的输入数据将包含一个唯一的键。因此,在前面的例子中,需要3个减速器工作。我的理解有误吗?

然后,作者介绍了映射器和减缩器的改进版本。这是减速器:

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='t'):
    for line in file:
        yield line.rstrip().split(separator, 1)
def main(separator='t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass
if __name__ == "__main__":
    main()

作者添加了以下警告:

注意:以下Map和Reduce脚本只能"正确"工作在Hadoop上下文中运行时,即作为MapReduce作业。这意味着运行天真的测试命令"cat数据|/mapper.py|排序-k1,1|/reductor.py"将不起作用因为某些功能是有意的外包给Hadoop。

我不明白为什么天真的测试命令不适用于新版本。我认为sort -k1,1的使用将为两个版本的减速器产生相同的输入。我错过了什么?

关于你的第一个问题:"我最初对reducer的理解是,给定reducer输入数据将包含一个唯一的键。所以在前面的例子中,需要3个reducer作业。我的理解不正确吗?"

MapReduce抽象和Hadoop对该抽象的实现之间存在差异。在抽象中,reducer与一个唯一的密钥相关联。另一方面,Hadoop实现将多个密钥分配给同一个reducer(以避免关闭进程和启动新进程的成本)。特别是,在Hadoop流中,reducer接收与一定数量的键(可以是零、一个或多个键)相对应的键值对,但您可以保证与某个键相关的键值对将连续出现。

例如,让我们以单词计数为例,输入"foo foo quux labs foo bar quux"。然后可能是一个reducer接收到输入"bar 1\nfoo 1\nfoo 1\nfo1",而另一个reductor接收到"labs 1\nqiux 1\nqiux 1"。实际运行的还原器进程的数量由您使用选项mapred.reduce.tasks决定。例如,要使用2个还原器,您可以执行

 $ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=2 -mapper ....

关于你的第二个问题,我同意你的看法,sort -k1,1会起作用,所以我也不认为有问题。

相关内容

  • 没有找到相关文章

最新更新