我是Hadoop mrjob的新手。我有一个文本文件;id groupId值";在每一行中。我正在尝试使用Hadoop映射reduce计算文本文件中所有值的中值。但当只计算中值时,我就陷入了困境。我得到的是每个id的中值,比如:
"123213" 5.0
"123218" 2
"231532" 1
"234634" 7
"234654" 2
"345345" 9
"345445" 4.5
"345645" 2
"346324" 2
"436324" 6
"436456" 2
"674576" 10
"781623" 1.5
输出应该类似于";所有值的中值为"###";。我被这篇文章影响了https://computehustle.com/2019/09/02/getting-started-with-mapreduce-in-python/我的python文件中值mrjob.py:
from mrjob.job import MRJob
from mrjob.step import MRStep
class MRMedian(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_stats, combiner=self.reducer_count_stats),
MRStep(reducer=self.reducer_sort_by_values),
MRStep(reducer=self.reducer_retrieve_median)
]
def mapper_get_stats(self, _, line):
line_arr = line.split(" ")
values = int(float(line_arr[-1]))
id = line_arr[0]
yield id, values
def reducer_count_stats(self, key, values):
yield str(sum(values)).zfill(2), key
def reducer_sort_by_values(self, values, ids):
for id in ids:
yield id, values
def reducer_retrieve_median(self, id, values):
valList=[]
median = 0
for val in values:
valList.append(int(val))
N = len(valList)
#find the median
if N % 2 == 0:
#if N is even
m1 = N / 2
m2 = (N / 2) + 1
#Convert to integer, match post
m1 = int(m1) - 1
m2 = int(m2) - 1
median = (valList[m1] + valList[m2]) / 2
else:
m = (N + 1) / 2
# Convert to integer, match position
m = int(m) - 1
median = valList[m]
yield (id, median)
if __name__ == '__main__':
MRMedian.run()
我的原始文本文件大约有100万到10亿行数据,但我创建了一个包含任意数据的测试文件。它的名称input.txt:
781623 2 2.3243
781623 1 1.1243
234654 1 2.122
123218 8 2.1245
436456 22 2.26346
436324 3 6.6667
346324 8 2.123
674576 1 10.1232
345345 1 9.56135
345645 7 2.1231
345445 10 6.1232
231532 1 1.1232
234634 6 7.124
345445 6 3.654376
123213 18 8.123
123213 2 2.1232
我关心的是价值观。考虑到这可能是重复的。我在终端中运行命令行来运行代码python median-mrjob.py input.txt
更新:任务的重点是不使用任何库,所以我需要手动排序列表(或者根据我的理解,可能是其中的一些(,并手动计算中值(硬编码(。否则,使用MapReduce的目标将消失。不允许在此分配中使用PySpark。查看此链接以获得更多灵感计算地图中的中值减少
输出应该像"所有值的中值为:;
然后,您需要先将所有数据强制到一个reducer(有效地挫败了使用MapReduce的目的(。
你可以通过不使用ID作为密钥并将其丢弃来做到这一点
def mapper_get_stats(self, _, line):
line_arr = line.split()
if line_arr: # prevent empty lines
value = float(line_arr[-1])
yield None, value
之后,排序并找到中位数(我固定了您的参数顺序(
def reducer_retrieve_median(self, key, values):
import statistics
yield None, f"median value of all values is: {statistics.median(values)}" # automatically sorts the data
所以,只有两步
class MRMedian(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_stats),
MRStep(reducer=self.reducer_retrieve_median)
]
对于给定的文件,您应该看到
null "median value of all values is: 2.2938799999999997"
原始文本文件大约有100万和10亿行数据
这并不重要,但它是什么?
你应该先把文件上传到HDFS,然后你可以使用比MrJob更好的工具,比如Hive或Pig。