从多个进程捕获返回值



摘要

使用Python,我想启动多个进程,这些进程并行运行具有不同参数的同一可执行文件。当所有的工作完成后,我想检查一下是否有错误,然后再做一些处理。

我尝试过的

我已经有了:

def main(path_of_script):
path_of_exe = make_path_to_exe(path_of_script)
#
lst_standin_params = [["1", "5"], ["2", "1"]]
#
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
#
future_standin_exe = { 
executor.submit(
subprocess.Popen(
[path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]]
)
): standin_arg_lst for standin_arg_lst in lst_standin_params 
}
#
for future in concurrent.futures.as_completed(future_standin_exe):
tmp_rv_holder = future_standin_exe[future]
#
try:
data = future.result()
except Exception as exc:
print('An exception occurred: %s' % (exc))

问题

进程运行良好,但在检查subprocess.Popen启动的每个进程是否成功完成方面,我显然做错了什么。我想我需要一种方法来获取对subprocess.Popen的调用的返回值,但我不确定如何获取。

当前代码在执行带有异常can't pickle _thread.lock objects的行data = future.result()时抛出异常。我确信尝试使用Future对象是错误的想法,但我不知道如何访问执行结果。

您应该创建使用stdout=PIPEp.stdout.read()捕获输出的函数

def func(path_of_exe, task_id, task_delay):

p = subprocess.Popen(
[path_of_exe, task_id, task_delay],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,        
)

return [p.stdout.read(), p.stderr.read()]

然后在执行器中使用

future_standin_exe = { 
executor.submit(
func, path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]
): standin_arg_lst for standin_arg_lst in lst_standin_params 
}

它必须是func, arg1, arg2, arg3,而不是func(arg1, arg2, arg3)

稍后您可以显示这两个输出。

data = future.result()
for item in data:
print(item)

stdout, stderr = future.result()
print('stdout:', stdout)
print('stderr:', stderr)

我用于测试的最小代码。

我没有任何程序可以运行,所以我使用了命令ls,它提供了一些输出,但毫无用处。

import concurrent.futures
import subprocess
TASK_ID_IDX = 0
TASK_DELAY_IDX = 0
def func(path_of_exe, standin_arg_lst):
p = subprocess.Popen(
[path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,        
)
return [p.stdout.read(), p.stderr.read()]
def make_path_to_exe(path):
return path
def main(path_of_script):
path_of_exe = make_path_to_exe(path_of_script)
#
lst_standin_params = [["1", "5"], ["2", "1"]]
#
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
#
future_standin_exe = { 
executor.submit(
func, path_of_exe, standin_arg_lst
): standin_arg_lst for standin_arg_lst in lst_standin_params 
}
#

for future in concurrent.futures.as_completed(future_standin_exe):
tmp_rv_holder = future_standin_exe[future]
#
try:
data = future.result()
for item in data:
print(item)
except Exception as exc:
print('An exception occurred: %s' % (exc))

main('dir')                

最新更新