Python concurrent.futures 使用子进程,运行多个 python 脚本



我想使用 concurrent.futures 同时运行多个 python 脚本。 我的代码的串行版本去文件夹中查找特定的python文件并执行它。

import re
import os
import glob
import re
from glob import glob
import concurrent.futures as cf
FileList = [];
import time
FileList = [];
start_dir = os.getcwd();
pattern   = "Read.py"
for dir,_,_ in os.walk(start_dir):
FileList.extend(glob(os.path.join(dir,pattern))) ;
FileList
i=0
for file in FileList:
dir=os.path.dirname((file))
dirname1 = os.path.basename(dir) 
print(dirname1)
i=i+1
Str='python '+ file
print(Str)
completed_process = subprocess.run(Str)`

对于我的代码的并行版本:

def Python_callback(future):
print(future.run_type, future.jid)
return "One Folder finished executing"
def Python_execute():
from concurrent.futures import ProcessPoolExecutor as Pool
args = FileList
pool = Pool(max_workers=1)
future = pool.submit(subprocess.call, args, shell=1)
future.run_type = "run_type"
future.jid = FileList
future.add_done_callback(Python_callback)
print("Python executed")
if __name__ == '__main__':
import subprocess
Python_execute()

问题是我不确定如何将文件列表的每个元素传递到单独的cpu

提前感谢您的帮助

最小的更改是对每个元素使用一次submit,而不是对整个列表使用一次:

futures = []
for file in FileList:
future = pool.submit(subprocess.call, file, shell=1)
future.blah blah
futures.append(future)

只有当你想对期货做一些事情时,futures列表才是必需的——等待它们完成,检查它们的返回值,等等。

同时,您正在使用max_workers=1显式创建池。毫不奇怪,这意味着您只会获得 1 个工作子进程,因此它最终会等待一个子进程完成,然后再抓取下一个子进程。如果您想实际同时运行它们,请删除该max_workers并让它默认为每个内核一个(或者传递max_workers=8或其他一些不1的数字,如果您有充分的理由覆盖默认值)。


在我们这样做的同时,有很多方法可以简化您正在做的事情:

  • 你真的需要multiprocessing吗?如果您需要与每个子进程进行通信,那么在单个线程中执行此操作可能会很痛苦 - 但是线程,或者可能是asyncio,在这里可以和进程一样工作。
  • 更重要的是,看起来你实际上不需要任何东西,只需要启动进程并等待它完成,这可以通过简单的同步代码完成。
  • 为什么要构建字符串并使用shell=1而不是只传递列表而不使用 shell?不必要地使用 shell 会产生开销、安全问题和调试烦恼。
  • 你真的不需要每个未来的jid- 它只是所有调用字符串的列表,这没有用。可能更有用的是某种标识符,或子进程返回代码,或...可能还有很多其他的事情,但它们都是可以通过读取subprocess.call的返回值或简单包装器来完成的事情。
  • 你也真的不需要回调。如果您只是将所有期货收集在一个列表中并as_completed,则可以打印结果,因为它们显示得更简单。
  • 如果您同时执行上述两项操作,则循环内只剩下pool.submit,这意味着您可以用pool.map替换整个循环。
  • 您很少需要或想要混合os.walkglob。当您实际具有 glob 模式时,请fnmatch应用于os.walk中的files列表。但是在这里,您只是在每个目录中查找特定的文件名,所以实际上,您需要过滤的只是file == 'Read.py'.
  • 您没有在循环中使用i。但是,如果您确实需要它,最好执行for i, file in enumerate(FileList):而不是执行for file in FileList:并手动递增i

相关内容

  • 没有找到相关文章

最新更新