MapReduce Python中使用yarn比较器的字数排序



我想解决字数问题,并希望根据文件中出现的频率以反向排序的顺序获得结果。

以下是我为此写的四个文件(2个映射器和2个还原器,因为一个Map Reduce作业无法解决这个问题):

1) mapper1.py

import sys
import re
reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode
for line in sys.stdin:
try:
article_id, text = unicode(line.strip()).split('t', 1)
except ValueError as e:
continue
words = re.split("W*s+W*", text, flags=re.UNICODE)
for word in words:
print "%st%d" % (word.lower(), 1)

2) 还原剂1.py

import sys
current_key = None
word_sum = 0
for line in sys.stdin:
try:
key, count = line.strip().split('t', 1)
count = int(count)
except ValueError as e:
continue
if current_key != key:
if current_key:
print "%st%d" % (current_key, word_sum)
word_sum = 0
current_key = key
word_sum += count
if current_key:
print "%st%d" % (current_key, word_sum)

3) mapper2.py

import sys
import re
reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode

for line in sys.stdin:
try:
word, count = line.strip().split('t', 1)
count = int(count)
except ValueError as e:
continue
print "%st%d" % (word, count)

4) 还原剂2.py

import sys
for line in sys.stdin:
try:
word, count = line.strip().split('t', 1)
count = int(count)
except ValueError as e:
continue
print "%st%d" % (word, count)

以下是我在bash环境中运行的两个yarn命令

OUT_DIR="wordcount_result_1"
NUM_REDUCERS=8
hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar 
-D mapred.jab.name="Streaming wordCount" 
-D mapreduce.job.reduces=${NUM_REDUCERS} 
-files mapper1.py,reducer1.py 
-mapper "python mapper1.py" 
-combiner "python reducer1.py" 
-reducer "python reducer1.py" 
-input /test/articles-part-short 
-output ${OUT_DIR} > /dev/null

OUT_DIR_2="wordcount_result_2"
NUM_REDUCERS=1
hdfs dfs -rm -r -skipTrash ${OUT_DIR_2} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar 
-D mapred.jab.name="Streaming wordCount Rating" 
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator 
-D map.output.key.field.separator=t 
-D mapreduce.partition.keycomparator.options=-k2,2nr 
-D mapreduce.job.reduces=${NUM_REDUCERS} 
-files mapper2.py,reducer2.py 
-mapper "python mapper2.py" 
-reducer "python reducer2.py" 
-input ${OUT_DIR} 
-output ${OUT_DIR_2} > /dev/null
hdfs dfs -cat ${OUT_DIR_2}/part-00000 | head

这并没有给我正确的答案。有人能解释一下哪里出了问题吗?

另一方面,

mapper2.py中,如果我以以下方式打印,

print "%dt%s" % (count, word)

并且在reducer2.py中,如果我以以下方式读取,

count, word = line.strip().split('t', 1)

并将第二条纱线命令选项编辑为

-D mapreduce.partition.keycomparator.options=-k1,1nr

它给了我正确的答案。

为什么它在上述两种情况下表现不同?

有人能帮我了解Hadoop MapReduce的Comparator选项吗?

这将适用于

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar 
-D mapred.jab.name="Streaming wordCount rating" 
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator 
-D mapreduce.partition.keycomparator.options='-k2nr' 
-D stream.num.map.output.key.fields=2 
-D mapred.map.tasks=1 
-D mapreduce.job.reduces=1 
-files mapper2.py,reducer2.py 
-mapper "python mapper2.py" 
-reducer "python reducer2.py" 
-input /user/jovyan/assignment0_1563877099149160 
-output ${OUT_DIR} > /dev/null

相关内容

  • 没有找到相关文章

最新更新