我有n
线程同时运行。这些线程正在处理一个包含m
测试用例的列表。例如,线程n-1
正在处理项目m[i-1]
,而线程n
正在处理项目m[i]
。如果线程n-1
失败或返回信号,我希望停止所有线程。我怎样才能做到这一点?
这是MWE:
这是我的处理功能
def process(input_addr):
i =+ 1
print('Total number of executed unit tests: {}'.format(i))
print("executed {}. thread".format(input_addr))
try:
command = 'python3 '+input_addr
result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
msg, err = result.communicate()
if msg.decode('utf-8') != '':
stat = parse_shell(msg.decode('utf-8'))
if stat:
print('Test Failed')
return True
else:
stat = parse_shell(err)
if stat:
print('Test Failed')
return True
except Exception as e:
print("thread.nMessage:{1}".format(e))
这是我的游泳池:
def pre_run_test_files(self):
with Pool(10) as p:
p.map(process, self.test_files)
我正在使用:
from multiprocessing import Pool
您可以使用辅助函数process
,只需引发一个异常,并将error_callback函数与apply_async
一起使用,CCD_9调用池上的terminate
,如以下演示所示:
from multiprocessing import Pool
def process(i):
import time
time.sleep(1)
if i == 6:
raise ValueError(f'Bad value: {i}')
print(i, flush=True)
def my_error_callback(e):
pool.terminate()
print(e)
if __name__ == '__main__':
pool = Pool(4)
for i in range(20):
pool.apply_async(process, args=(i,), error_callback=my_error_callback)
# wait for all tasks to complete
pool.close()
pool.join()
打印:
0
1
3
2
4
5
7
Bad value: 6
您应该能够根据您的特定问题调整上述代码。
更新
因为您的原始代码使用了map
方法,所以还有第二个使用metidimap_unordered
的解决方案,它将返回一个迭代器,该迭代器在每次迭代时都会返回辅助函数的下一个返回值process
,或者如果辅助函数引发异常,则引发异常。使用方法imap_unordere
,这些结果以任意的完成顺序返回,而不是以任务提交顺序返回,但当使用默认的chunksize参数1时,该任意顺序通常是任务完成顺序。这就是您想要的,这样您就可以尽早检测到异常并终止池。当然,如果您关心process
的返回值,那么您将使用方法imap
,以便按任务提交顺序返回结果。但在这种情况下,如果当情况i==6是引发异常时,但该任务恰好是第一个完成的任务,则在为i==1至5提交的任务完成之前,其异常仍然无法返回。
在下面的代码中,使用了8的池大小,所有任务在打印它们的参数并返回之前都会先休眠1秒,但在i==6的情况下会立即引发异常的除外。使用imap_unordered
,我们有:
from multiprocessing import Pool
def process(i):
import time
# raise an exception immediately for i == 6 without sleeping
if (i != 6):
time.sleep(1)
else:
raise ValueError(f'Bad value: {i}')
print(i, flush=True)
if __name__ == '__main__':
pool = Pool(8)
results = pool.imap_unordered(process, range(20))
try:
# Iterate results as task complete until
# we are done or one raises an exeption:
for result in results:
# we don't care about the return value:
pass
except Exception as e:
pool.terminate()
print(e)
pool.close()
pool.join()
打印:
Bad value: 6
如果我们将对imap_unordered
的调用替换为对imap
的调用,则输出为:
0
1
2
3
4
5
Bad value: 6
第一个解决方案使用带有error_callback参数的apply_async
,允许在异常发生时立即执行和。如果您关心任务提交顺序中的结果,则可以将apply_async
返回的multiprocessing.AsyncResult
对象保存在列表中,并对这些对象调用get
。在RAISE_EXCEPTION
设置为True
,然后设置为CCD26:的情况下,尝试以下代码
from multiprocessing import Pool
import time
RAISE_EXCEPTION = True
def process(i):
if RAISE_EXCEPTION and i == 6:
raise ValueError(f'Bad value: {i}')
time.sleep(1)
return i # instead of printing
def my_error_callback(e):
global got_exception
got_exception = True
pool.terminate()
print(e)
if __name__ == '__main__':
got_exception = False
pool = Pool(4)
async_results = [pool.apply_async(process, args=(i,), error_callback=my_error_callback) for i in range(20)]
# Wait for all tasks to complete:
pool.close()
pool.join()
if not got_exception:
for async_result in async_results:
print(async_result.get())
我找到了解决方案:
def process(i, input_addr, event):
kill_flag = False
if not event.is_set():
print('Total number of executed unit tests: {}'.format(i))
print("executed {}. thread".format(input_addr))
try:
command = 'python3 '+input_addr
result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
msg, err = result.communicate()
if msg.decode('utf-8') != '':
stat = parse_shell(msg.decode('utf-8'))
if stat:
print('Test Failed')
kill_flag = True
# all_run.append(input_addr)
#write_list_to_txt(input_addr, valid_tests)
else:
kill_flag = False
else:
stat = parse_shell(err)
if stat:
print('Test Failed')
kill_flag = True
# all_run.append(input_addr)
#write_list_to_txt(input_addr, valid_tests)
else:
kill_flag = False
except Exception as e:
print("thread.nMessage:{1}".format(e))
if kill_flag:
event.set()
def manager():
p= multiprocessing.Pool(10)
m = multiprocessing.Manager()
event = m.Event()
for i,f in enumerate(self.test_files):
p.apply_async(process, (i, f, event))
p.close()
event.wait()
p.terminate()