我是这个多处理概念的新手。我正在尝试对拼写函数实现多处理,以使其运行得更快。我按照下面的顺序进行了尝试,但没有按照之前的顺序得到结果,token
这里是标记化句子的巨大列表。
from spellchecker import SpellChecker
from wordsegment import load, segment
from timeit import default_timer as timer
from multiprocessing import Process, Pool, Queue, Manager
def text_similarity_spellings(self, token):
"""Uses spell checker to separate incorrect spellings and correct them"""
spell = SpellChecker()
unknown_words = [list(spell.unknown(word)) for word in token]
known_words = [list(spell.known(word)) for word in token]
load()
segmented = [[segment(word) for word in sub] for sub in unknown_words]
flat_list = list(self.unpacker(segmented))
new_list = [[known_words[x], flat_list[x]] for x in range(len(known_words))]
new_list = list(self.unpacker(new_list))
newlist = [sorted(set(mylist), key=lambda x: mylist.index(x)) for mylist in new_list]
return newlist
def run_all(self):
tread_vta = Manager().list()
processes = []
arg_split = np.array_split(np.array(token),10)
arg_tr_cl = []
finds = []
trdclean1 = []
for count, k in enumerate(arg_split):
arg_tr_cl.append((k, [], tread_vta, token[t]))
for j in range(len(arg_tr_cl)):
p = Process(target= self.text_similarity_spellings, args=arg_tr_cl[j])
p.start()
processes.append(p)
for p in processes:
p.join()
有人能给我推荐一种更好的方法来将多处理应用于特定函数并以正确的顺序获得结果吗?
首先,在创建流程时有一定的开销,然后在将参数从主流程传递到子流程时又有更多的开销;生命;在另一个地址空间中,并返回返回值(顺便说一句,您没有为实际从辅助函数text_similarity_spellings
返回返回值做任何准备(。因此,为了从使用多处理中获利,并行执行任务(调用辅助函数(的收益必须足以抵消上述额外成本。所有这些只是一种说法,即您的工作函数必须具有足够的CPU密集度,才能证明多处理是合理的。
其次,考虑到创建流程的成本,您不希望创建超出您可能使用范围的流程。如果您有N
任务要完成(arg_tr_cl
的长度(,M
CPU处理器要在和上运行它们,那么您的工作函数是纯CPU(不涉及I/O(,那么尝试使用超过M
的进程来运行这些任务永远不会有任何收获。然而,如果他们确实结合了一些I/O,那么使用更多的流程可能会有利可图。如果涉及大量I/O,而只涉及一些CPU密集型处理,那么使用多线程和多处理的组合可能是可行的。最后,如果辅助函数主要是I/O,那么多线程就是您想要的。
有一种解决方案可以使用X
进程(基于您已确定的X
的任何值(来完成N
任务和,从而能够从工作函数中获取返回值,即使用大小为X
的进程池。
MULTITHREADING = False
n_tasks = len(arg_tr_cl)
if MULTITHREADING:
from multiprocessing.dummy import Pool
# To use multithreading instead (we can use a much larger pool size):
pool_size = min(n_tasks, 100) # 100 is fairly arbitrary
else:
from multiprocessing import Pool, cpu_count
# No point in creating pool size larger than the number of tasks we have
# Otherwise, assuming we are mostly CPU-intensive, just create pool size
# equal to the number of cpu cores that we have:
n_processors = cpu_count()
pool_size = min(n_tasks, n_processors)
pool = Pool(pool_size)
return_values = pool.map(self.text_similarity_spellings, arg_tr_cl)
# You can now iterate return_values to get the return values:
for return_value in return_values:
...
# or create a list, for example: return_values = list(return_values)
但是,如果每次调用都必须在外部字典中读取,那么SpellChecker
可能正在执行大量I/O操作。如果是这样的话,您的最佳性能难道不可能是初始化SpellChecker
一次,然后只循环检查每个字,而完全忘记多处理(或多线程(吗