一旦任何进程遇到错误,我如何终止python中的异步星图多处理池



我使用的是Python多处理库中的starmap_async函数。但是,我注意到,如果我的代码在其中一个进程中遇到错误,则在所有进程完成之前不会抛出异常。这是相关代码:

from multiprocessing import Pool, cpu_count
import datetime
import itertools
import time
with Pool(max(cpu_count()//2, 1)) as p:
#p = Pool()
df_iter = df_options.iterrows()
ir = itertools.repeat
results = p.starmap_async(_run,zip(df_iter,ir(fixed_options),ir(outputs_grab)), chunksize=1)
p.close() #no more jobs to submit

#Printing progress

n_remaining = results._number_left + 1
while (not results.ready()):
time.sleep(1)
#Check for errors here ... How ????
#Then what? call terminate()????? 
if verbose:
if results._number_left < n_remaining:
now = datetime.datetime.now()
n_remaining = results._number_left
print('%d/%d  %s' % (n_remaining,n_rows,str(now)[11:]))

print('joining')
p.join()

all_results = results.get()

df = pd.DataFrame(all_results)

目前,如果我在派生的进程中引发错误,那么其他进程似乎不仅完成了运行,而且启动了新任务,尽管其中一个调用出现了错误。

一些搜索让我相信这可能是不可能的。有人似乎建议我可能需要使用concurrent.futures,尽管尚不清楚如何将我的示例映射到该示例,尤其是在流程完成时保持实时反馈。

CCD_ 3的讨论:https://stackoverflow.com/a/47108581/764365

tldr;使用imap_unordered可以使主进程在知道子进程抛出异常的情况下具有最小的延迟,因为它允许您在主进程中通过结果Queue处理结果。然后,您可以使用包装器函数来构建自己的";星形;函数的版本(如果您愿意(。作为代码设计的一个要点,大多数Pool方法倾向于从子级重新引发异常,而concurrent.futures倾向于设置返回值的属性以指示引发的异常。

from random import random
from functools import partial
from multiprocessing import Pool
from time import sleep
def foo(a, b):
sleep(random()) #introduce some processing delay to simulate work
if random() > .95:
raise Exception("randomly rasied an exception")
else:
return f"{a}t{b}"
def star_helper(func, args):
return func(*args)
if __name__ == "__main__":
n = 20
print("chance of early termination:", (1-.95**n)*100, "%")
with Pool() as p:
try:
for result in p.imap_unordered(partial(star_helper, foo), zip(range(n), range(n))):
print(result)
except:
p.terminate()
print("terminated")
print("done") # `with Pool()` joins the child processes to prove they quit early

相关内容

  • 没有找到相关文章