我正在尝试用python编写一个map reduce函数。我有一个包含产品信息的文件,我想统计属于同一类别并具有相同版本的产品的数量。像这样:<category, {count, version} >
我的文件信息如下:
product_name rate category id version
a "3.0" cat1 1 1
b "2.0" cat1 2 1
c "4.0" cat1 3 4
d "1.0" cat2 3 2
. . . . .
. . . . .
. . . . .
例如:
<cat1, {2, 1} >
我写了这段代码,但在组合器函数中,我不知道如何计算它们。
from mrjob.job import MRJob
from mrjob.step import MRStep
class MRFrequencyCount(MRJob):
def steps(self):
return [
MRStep(
mapper=self.mapper_extract_words,
combiner=self.combine_word_counts,
)
]
def mapper_extract(self, _, line):
(product_name, rate, category, id, version) = line.split('*')
yield category, (1, version)
def combine_counts(self, category, countAndVersion):
yield category, sum(countAndVersion)
if __name__ == '__main__':
MRFrequencyCount.run()
问题是您正在创建的密钥。由于您本质上是按Category和Version进行分组,因此应该将其作为复合键发送到combiner
函数。reducer
然后可以分解复合密钥并以期望的格式发射数据。
from mrjob.job import MRJob
from mrjob.step import MRStep
class MRFrequencyCount(MRJob):
def steps(self):
return [
MRStep(
mapper=self.mapper_extract,
combiner=self.combine_counts,
reducer=self.reduce_counts
)
]
def mapper_extract(self, _, line):
(product_name, rate, category, id, version) = line.split('*')
yield (category, version), 1
def combine_counts(self, cat_version, count):
yield cat_version, sum(count)
def reduce_counts(self, cat_version, counts):
category, version = cat_version
final = sum(counts)
yield category, (final, version)
if __name__ == '__main__':
MRFrequencyCount.run()
a*3.0*cat1*1*1
b*2.0*cat1*2*1
c*4.0*cat1*3*4
d*1.0*cat2*3*2
"cat2" [1, "2"]
"cat1" [1, "4"]
"cat1" [2, "1"]