mrJob python mapReduce word_count.py



我刚刚开始使用mrJob(python的mapReduce),并且是MapReduce范式的新手,我想了解MRJob文档站点上存在的word_count.py教程。

文档说,如果我们创建一个word_count.py并使用一些文本文件运行它,它将计算并返回文本文件中的行、字符和单词的计数。以下是他们用于word_count.py的代码:

from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1
    def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
    MRWordFrequencyCount.run()

在这里,我知道我们扩展了 MRJob 类并覆盖了映射器和化简器方法。但是我没有得到的是,在执行过程中,我们通过传递整个文本文件来执行,如下所示:

python  word_count.py  entire_text_file.txt

那么映射器如何知道如何一次解析一行呢?基本上我的问题是在这种情况下,上面定义的 mapper() 函数的输入是什么?它是整个文件的内容还是一次一行的内容。如果是单行,MRJob 代码的哪一部分负责一次向 mapper() 函数提供一行。希望我把我原本模糊的问题变得不那么模糊,但这让我完全被难住了。任何帮助将不胜感激。

提前感谢!

好吧,我想最好的答案是RTFC:P

如果你查看/usr/lib/python2.6/site-packages/mrjob/job.py(假设你在 python2.6 上安装了带有 pip 的 mrjob),你会发现它是如何准确地从输入中读取行并为每一行运行映射器

def run_mapper(self, step_num=0):
    ...
    # pick input and output protocol
    read_lines, write_line = self._wrap_protocols(step_num, 'mapper')
    if mapper_init:
        for out_key, out_value in mapper_init() or ():
            write_line(out_key, out_value)
    # run the mapper on each line
    for key, value in read_lines():
        for out_key, out_value in mapper(key, value) or ():
            write_line(out_key, out_value)
    if mapper_final:
        for out_key, out_value in mapper_final() or ():
            write_line(out_key, out_value)

这里定义了read_lines

def _wrap_protocols(self, step_num, step_type):
    """Pick the protocol classes to use for reading and writing
    for the given step, and wrap them so that bad input and output
    trigger a counter rather than an exception unless --strict-protocols
    is set.
    Returns a tuple of ``(read_lines, write_line)``
    ``read_lines()`` is a function that reads lines from input, decodes
        them, and yields key, value pairs.
    ``write_line()`` is a function that takes key and value as args,
        encodes them, and writes a line to output.
    :param step_num: which step to run (e.g. 0)
    :param step_type: ``'mapper'``, ``'reducer'``, or ``'combiner'`` from
                      :py:mod:`mrjob.step`
    """
    read, write = self.pick_protocols(step_num, step_type)
    def read_lines():
        for line in self._read_input():
            try:
                key, value = read(line.rstrip('rn'))
                yield key, value
            except Exception, e:
                if self.options.strict_protocols:
                    raise
                else:
                    self.increment_counter(
                        'Undecodable input', e.__class__.__name__)
    def write_line(key, value):
        try:
            print >> self.stdout, write(key, value)
        except Exception, e:
            if self.options.strict_protocols:
                raise
            else:
                self.increment_counter(
                    'Unencodable output', e.__class__.__name__)
    return read_lines, write_line

最终,您可以在/usr/lib/python2.6/site-packages/mrjob/util.py 中阅读read_input和read_file方法。 希望它有帮助

相关内容

  • 没有找到相关文章

最新更新