摘要
使用Python,我想启动多个进程,这些进程并行运行具有不同参数的同一可执行文件。当所有的工作完成后,我想检查一下是否有错误,然后再做一些处理。
我尝试过的
我已经有了:
def main(path_of_script):
path_of_exe = make_path_to_exe(path_of_script)
#
lst_standin_params = [["1", "5"], ["2", "1"]]
#
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
#
future_standin_exe = {
executor.submit(
subprocess.Popen(
[path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]]
)
): standin_arg_lst for standin_arg_lst in lst_standin_params
}
#
for future in concurrent.futures.as_completed(future_standin_exe):
tmp_rv_holder = future_standin_exe[future]
#
try:
data = future.result()
except Exception as exc:
print('An exception occurred: %s' % (exc))
问题
进程运行良好,但在检查subprocess.Popen
启动的每个进程是否成功完成方面,我显然做错了什么。我想我需要一种方法来获取对subprocess.Popen
的调用的返回值,但我不确定如何获取。
当前代码在执行带有异常can't pickle _thread.lock objects
的行data = future.result()
时抛出异常。我确信尝试使用Future
对象是错误的想法,但我不知道如何访问执行结果。
您应该创建使用stdout=PIPE
和p.stdout.read()
捕获输出的函数
def func(path_of_exe, task_id, task_delay):
p = subprocess.Popen(
[path_of_exe, task_id, task_delay],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return [p.stdout.read(), p.stderr.read()]
然后在执行器中使用
future_standin_exe = {
executor.submit(
func, path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]
): standin_arg_lst for standin_arg_lst in lst_standin_params
}
它必须是func, arg1, arg2, arg3
,而不是func(arg1, arg2, arg3)
稍后您可以显示这两个输出。
data = future.result()
for item in data:
print(item)
或
stdout, stderr = future.result()
print('stdout:', stdout)
print('stderr:', stderr)
我用于测试的最小代码。
我没有任何程序可以运行,所以我使用了命令ls
,它提供了一些输出,但毫无用处。
import concurrent.futures
import subprocess
TASK_ID_IDX = 0
TASK_DELAY_IDX = 0
def func(path_of_exe, standin_arg_lst):
p = subprocess.Popen(
[path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return [p.stdout.read(), p.stderr.read()]
def make_path_to_exe(path):
return path
def main(path_of_script):
path_of_exe = make_path_to_exe(path_of_script)
#
lst_standin_params = [["1", "5"], ["2", "1"]]
#
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
#
future_standin_exe = {
executor.submit(
func, path_of_exe, standin_arg_lst
): standin_arg_lst for standin_arg_lst in lst_standin_params
}
#
for future in concurrent.futures.as_completed(future_standin_exe):
tmp_rv_holder = future_standin_exe[future]
#
try:
data = future.result()
for item in data:
print(item)
except Exception as exc:
print('An exception occurred: %s' % (exc))
main('dir')