我想更改我的map reduce文件,以输出文本块中的顶部bigram,而不是单词计数,因此单词和bigram计数都是
这是我目前的代码和方法。
地图:
import sys
for line in sys.stdin:
line = line.strip()
words = line.split() #bigrams = line.split()
for word in words: #for bigram in words
print '%st%s' % (word,1) #print ... word pair???
减少:
mydict = dict()
for line in sys.stdin:
(word,cnt) = line.strip().split('t') #bigram and bigram count
mydict[word] = mydict.get(word,0) 1
for word,cnt in mydict.items():
print word,cnt #print bigram and bigram count
谢谢。
我认为nltk是一种流行的计算bigram的解决方案,即使在我的mapreduce格式中,我也应该采用这种方法吗?
我不会使用stdin和stdout。我宁愿抛出multiprocessing
,并从一些保存的文件中读取:
import multiprocessing as mp
def main(infilepath):
bgqIn, bgqOut = [mp.Queue() for _ in xrange(2)]
procs = [mp.Process(target=mapper, args=(bgqIn, bgqOut)) for _ in xrange(mp.cpu_count())]
for p in procs:
p.start()
with open(infilepath) as infile:
first = ''
second = ''
for line in infile:
line = line.lower()
for word in line.split():
first = second
second = word
bigram = (first, second)
bgqIn.put(bigram)
for p in procs:
bgqIn.put(None)
rqs = [(mp.Queue() for _ in xrange(2)) for i in xrange(mp.cpu_count())]
rprocs = [mp.Process(target=reducer, args=(*rqs[i])) for i in xrange(mp.cpu_count())]
for p in rprocs:
p.start()
qmap = {}
for char in xrange(97,123):
qmap[ord(char)] = rqs[(char-97)/len(rqs)]
dones = 0
while dones != len(procs):
t = bgqOut.get()
if t is None:
dones += 1
else:
qmap[t[0][0]].put(t)
for q in rqs:
q.put(None)
answer = {}
for q in rqs:
for bg,count in iter(q.get, None):
if bg not in answer:
answer[bg] = 0
answer[bg] += count
for bg,count in answer.iteritems():
print "There are", count, "occurrences of", bg
def mapper(qIn, qOut):
counts = {}
for bg in iter(qIn.get, None):
if bg not in counts:
counts[bg] = 0
counts[bg] += 1
for k,v in counts.iteritems():
qOut.put((k,v))
qOut.put(None)
def reducer(qIn, qOut):
counts = {}
for bg,count in iter(qIn.get, None):
if bg not in counts:
counts[bg] = 0
counts[bg] += count
for bg,count in counts.iteritems():
qOut.put((bg,count))
qOut.put(None)
我还没有测试过这个,但它是一个基本的骨架,应该会让你开始。