我刚刚开始使用mrJob(python的mapReduce),并且是MapReduce范式的新手,我想了解MRJob文档站点上存在的word_count.py教程。
文档说,如果我们创建一个word_count.py并使用一些文本文件运行它,它将计算并返回文本文件中的行、字符和单词的计数。以下是他们用于word_count.py的代码:
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
yield "chars", len(line)
yield "words", len(line.split())
yield "lines", 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRWordFrequencyCount.run()
在这里,我知道我们扩展了 MRJob 类并覆盖了映射器和化简器方法。但是我没有得到的是,在执行过程中,我们通过传递整个文本文件来执行,如下所示:
python word_count.py entire_text_file.txt
那么映射器如何知道如何一次解析一行呢?基本上我的问题是在这种情况下,上面定义的 mapper() 函数的输入是什么?它是整个文件的内容还是一次一行的内容。如果是单行,MRJob 代码的哪一部分负责一次向 mapper() 函数提供一行。希望我把我原本模糊的问题变得不那么模糊,但这让我完全被难住了。任何帮助将不胜感激。
提前感谢!
好吧,我想最好的答案是RTFC:P
如果你查看/usr/lib/python2.6/site-packages/mrjob/job.py(假设你在 python2.6 上安装了带有 pip 的 mrjob),你会发现它是如何准确地从输入中读取行并为每一行运行映射器
def run_mapper(self, step_num=0):
...
# pick input and output protocol
read_lines, write_line = self._wrap_protocols(step_num, 'mapper')
if mapper_init:
for out_key, out_value in mapper_init() or ():
write_line(out_key, out_value)
# run the mapper on each line
for key, value in read_lines():
for out_key, out_value in mapper(key, value) or ():
write_line(out_key, out_value)
if mapper_final:
for out_key, out_value in mapper_final() or ():
write_line(out_key, out_value)
这里定义了read_lines
def _wrap_protocols(self, step_num, step_type):
"""Pick the protocol classes to use for reading and writing
for the given step, and wrap them so that bad input and output
trigger a counter rather than an exception unless --strict-protocols
is set.
Returns a tuple of ``(read_lines, write_line)``
``read_lines()`` is a function that reads lines from input, decodes
them, and yields key, value pairs.
``write_line()`` is a function that takes key and value as args,
encodes them, and writes a line to output.
:param step_num: which step to run (e.g. 0)
:param step_type: ``'mapper'``, ``'reducer'``, or ``'combiner'`` from
:py:mod:`mrjob.step`
"""
read, write = self.pick_protocols(step_num, step_type)
def read_lines():
for line in self._read_input():
try:
key, value = read(line.rstrip('rn'))
yield key, value
except Exception, e:
if self.options.strict_protocols:
raise
else:
self.increment_counter(
'Undecodable input', e.__class__.__name__)
def write_line(key, value):
try:
print >> self.stdout, write(key, value)
except Exception, e:
if self.options.strict_protocols:
raise
else:
self.increment_counter(
'Unencodable output', e.__class__.__name__)
return read_lines, write_line
最终,您可以在/usr/lib/python2.6/site-packages/mrjob/util.py 中阅读read_input和read_file方法。 希望它有帮助