Python 多处理 .get() 永无止境



我来这里是因为我在python脚本中遇到了多处理问题。 我的代码是:

def filter_list_of_list_values(myList,myFilterList):
for index in range(len(myList)):
print(index)
sub_array = myList[index]
for stopword in myFilterList :
sub_array = list(filter(lambda a: a != stopword, sub_array))
sub_array = [w for w in sub_array if not w in myFilterList]
myList[index] = sub_array
return myList  
import multiprocessing
import numpy as np
#We are going to execute a multiprocessing and split the list in as many parts than processors used :
N_PROCS = 6
#Lists splitting : 
L_sub_lists  = np.array_split(tokenized_text, N_PROCS)

final_List = []
start_time = time.time()

print('Creating pool with %d processesn' % N_PROCS)
with multiprocessing.Pool(N_PROCS) as pool:
#We initialize a list of tasks which each call the same function, but
#with a diffrent list
TASKS = [(sub_list, english_stopwords) for sub_list in L_sub_lists]
print("TASK OK")
results = [pool.apply_async(filter_list_of_list_values, t) for t in TASKS]
print("results OK")

final_results = [r.get() for r in results]
print("final_results OK")
for sub_list_res in final_results:
print("appending")
final_List+= sub_list_res
print("list_append")

print("--- %s seconds ---" % (time.time() - start_time))

脚本卡在:

final_results = [r.get() for r in results]

我真的不明白为什么,因为我将相同的脚本(有一些小差异(与其他上下文(不同的功能并应用于数据帧而不是列表列表(一起使用,并且一切都运行良好

一个例子:

L = [['Paris', 'Monaco', 'Washington', 'Lyon', 'Venise', 'Marseille'],
['New-York', 'NapleWashington', 'Turin', 'Chicago', 'Las Vegas'],
['Lyon', 'Rome', 'Chicago', 'Venise', 'Naple', 'Turin']]
filter_list_of_list_values(L,['Lyon','Turin','Chicago'])

将导致:

[['Paris', 'Monaco', 'Washington', 'Venise', 'Marseille'], ['New-York', 'NapleWashington', 'Las Vegas'], ['Rome', 'Venise', 'Naple']]

所以,我试着看看这个,看起来你的代码几乎是正确的。 不过,你错过了你实际形成输入的方式。 您的示例代码失败,因为您没有导入时间,并且没有定义tokenized_text,而且我不知道输入实际上应该是什么。 但!根据您的示例,您的代码确实有效,所以我怀疑您正在做什么来形成输入都是不正确的

这是代码的基本功能版本

import time
import multiprocessing
import numpy as np
N_PROCS = 6
# L_sub_lists = np.array_split(tokenized_text, N_PROCS)
final_List = []
start_time = time.time()
L = [['Paris', 'Monaco', 'Washington', 'Lyon', 'Venise', 'Marseille'],
['New-York', 'NapleWashington', 'Turin', 'Chicago', 'Las Vegas'],
['Lyon', 'Rome', 'Chicago', 'Venise', 'Naple', 'Turin']]
filter_list = ['Lyon', 'Turin', 'Chicago']

def filter_list_of_list_values(myList, myFilterList):
for index in range(len(myList)):
sub_array = myList[index]
for stop_word in myFilterList:
sub_array = list(filter(lambda a: a != stop_word, sub_array))
sub_array = [w for w in sub_array if w not in myFilterList]
myList[index] = sub_array
return myList  

print(filter_list_of_list_values(L, filter_list))
print('Creating pool with %d processesn' % N_PROCS)
with multiprocessing.Pool(N_PROCS) as pool:
TASKS = [([sub_list], filter_list) for sub_list in L]
print("TASK OK")
results = [pool.apply_async(filter_list_of_list_values, t) for t in TASKS]
print("results OK")
print("Getting final results")
final_results = [r.get() for r in results]
print("final_results OK")
print("Printing final_results %s" % final_results)
print("--- %s seconds ---" % (time.time() - start_time))

它基本上只是将较大的列表分解为较小的列表,并在子进程中处理这些列表。 我事先测试了一次 main 函数,以验证您期望的输出和结果以验证分布式处理是否返回相同。 我认为这就是重点,尽管我不确定,因为除了"此代码不起作用"和"我希望这些输出"之外,问题尚不清楚

下面是脚本输出:

[['Paris', 'Monaco', 'Washington', 'Venise', 'Marseille'], ['New-York', 'NapleWashington', 'Las Vegas'], ['Rome', 'Venise', 'Naple']]
Creating pool with 6 processes
TASK OK
results OK
Getting final results
final_results OK
Printing final_results [[['Paris', 'Monaco', 'Washington', 'Venise', 'Marseille']], [['New-York', 'NapleWashington', 'Las Vegas']], [['Rome', 'Venise', 'Naple']]]

相关内容

  • 没有找到相关文章