我正试图使用concurrent.futures用mapreduce做一个单词计数器,以前我做过多线程版本,但速度太慢了,因为受CPU限制。我已经完成了映射部分,将单词划分为['word1',1],['word2,1],[`word1,1],['word3',1]以及进程之间的映射,因此每个进程都会处理文本文件的一部分。下一步("shuffling"(是将这些单词放在字典中,使其看起来像这样:单词1:[1,1],单词2:[1],单词3:[1],但我不能在进程之间共享字典,因为我们使用的是多处理而不是多线程,所以我如何使每个进程添加"shuffle";1〃;所有进程之间共享的字典?我陷入了困境,无法继续。我现在是:
import sys
import re
import concurrent.futures
import time
# Read text file
def input(index):
try:
reader = open(sys.argv[index], "r", encoding="utf8")
except OSError:
print("Error")
sys.exit()
texto = reader.read()
reader.close()
return texto
# Convert text to list of words
def splitting(input_text):
input_text = input_text.lower()
input_text = re.sub('[,.;:!¡?¿()]+', '', input_text)
words = input_text.split()
n_processes = 4
# Creating processes
with concurrent.futures.ProcessPoolExecutor() as executor:
results = []
for id_process in range(n_processes):
results.append(executor.submit(mapping, words, n_processes, id_process))
for f in concurrent.futures.as_completed(results):
print(f.result())
def mapping(words, n_processes, id_process):
word_map_result = []
for i in range(int((id_process / n_processes) * len(words)),
int(((id_process + 1) / n_processes) * len(words))):
word_map_result.append([words[i], 1])
return word_map_result
if __name__ == '__main__':
if len(sys.argv) == 1:
print("Please, specify a text file...")
sys.exit()
start_time = time.time()
for index in range(1, len(sys.argv)):
print(sys.argv[index], ":", sep="")
text = input(index)
splitting(text)
# for word in result_dictionary_words:
# print(word, ':', result_dictionary_words[word])
print("--- %s seconds ---" % (time.time() - start_time))
我已经看到,在进行并发编程时,通常最好尽可能避免使用共享状态,那么我如何在不在进程之间共享字典的情况下实现Map reduce字数呢?
您可以使用多处理中的管理器创建共享字典。我从你的节目中了解到,这是你需要分享的word_map_result
。
你可以试试这种
from multiprocessing import Manager
...
def splitting():
...
word_map_result = Manager().dict()
with concurrent.futures.....:
...
results.append(executor.submit(mapping, words, n_processes, id_process, word_map_result)
...
...
def mapping(words, n_processes, id_process, word_map_result):
for ...
# Do not return anything - word_map_result is up to date in your main process
基本上,您将从映射函数中删除word_map_result的本地副本,并将其作为参数传递给Manager实例。这个word_map_result现在在所有子流程和主程序之间共享。不过,管理人员增加了数据传输开销,所以这可能对您没有太大帮助。
在这种情况下,您不从worker返回任何内容,因此也不需要for循环来处理主程序中的结果——您的word_map_result在所有子流程和主程序中都是相同的。
我可能误解了你的问题,我不熟悉算法,如果可以重新设计它,这样你就不需要在流程之间共享任何东西。
使用多处理似乎是一种误解。首先,创建池和在进程之间传递数据会产生开销。如果您决定使用工作函数mapping
可以用来存储其结果的共享托管字典,请知道托管字典使用的是代理,其访问速度相当慢。使用托管字典的另一种选择是使用当前的字典,即mapping
返回一个列表,主进程使用这些结果来创建字典的键和值。但是,mapping
返回一个列表的意义是什么,其中每个元素总是两个元素的列表,其中第二个元素总是常数值1?这不是相当浪费时间和空间吗?
我认为你的性能不会比仅仅实现拆分更快(可能更慢(:
# Convert text to list of words
def splitting(input_text):
input_text = input_text.lower()
input_text = re.sub('[,.;:!¡?¿()]+', '', input_text)
words = input_text.split()
results = {}
for word in words:
results[word] = [1]
return results