ProcessPoolExecutor未限制为设置值



我有许多计算过程需要运行。它们需要20分钟到1天以上的时间。我希望用户能够通过标准输出观察每个人在做什么,因此我在自己的cmd窗口中执行每个人。当我设置工人的数量时,它不会观察到这个值,并且一直在旋转,直到我取消程序。

def run_job(args):
os.system("start cmd /k "{} > "{}\stdout.txt""".format(run_command,
outpath))

CONCURRENCY_HANDLER = concurrent.futures.ProcessPoolExecutor(max_workers = 3)
jobs =[]
ALL_RUNS_MATRIX = [{k1:v1...kn:vn},....
{kA1,vA1...kAn,vAn}
]
with CONCURRENCY_HANDLER as executor:
for idx, configuration in enumerate(ALL_RUNS_MATRIX):
generate_run_specific_files(configuration,idx)
args = [doesnt,matter]
time.sleep(5)
print("running new")
jobs.append( executor.submit(run_job,args))
time.sleep(10)

我最初尝试使用ThreadPoolExector来达到同样的效果。为什么这实际上并没有限制同时发生的数量,如果这不起作用,我应该使用什么?我需要保留这个";生成->等待->运行";路径,因为程序的性质(我更改了它为配置读取的文件,它启动,在内存中保留所有必要的信息,然后执行(,所以我对";工人们把他们的工作从队列中拉出来,因为他们来了";型号

不太确定你想做什么。也许给我们举一个简单任务的例子,它与流程有同样的问题?您是否认为max_workers是派生进程数量的上限?这可能不是真的。我认为max_workers是进程池允许使用的处理器内核数。根据文件,

如果max_workers为None或未给定,则它将默认为计算机上的处理器数。如果max_workers小于或等于0,则将引发ValueError。在Windows上,max_workers必须小于或等于61。如果不是,则将引发ValueError。如果max_workers为None,则即使有更多处理器可用,默认选择的值也最多为61。

这里有一个简单的例子,

from concurrent.futures import ProcessPoolExecutor
from time import sleep
futures = []

def job(i):
print('Job started: ' + str(i))
return i

def all_done():
done = True
for ft in futures:
done = done and ft.done()
return done

with ProcessPoolExecutor(max_workers=8) as executor:
for i in range(3):
futures.append(executor.submit(job, i))
while not all_done():
sleep(0.1)
for ft in futures:
print('Job done: ' + str(ft.result()))

它打印,

Job started: 0
Job started: 1
Job started: 2
Job done: 0
Job done: 1
Job done: 2

这有帮助吗?

正如我在评论中提到的,只要打开新的命令窗口满足启动命令,即使传递给cmd /K的运行命令刚刚开始运行,系统命令也会返回为已完成。因此,池中的进程现在可以自由运行另一个任务。

如果我正确理解你的问题,你有以下目标:

  1. 检测命令的真实完成情况,以确保同时运行的命令不超过3个
  2. 在一个窗口中收集命令的输出,即使在命令完成后,该窗口仍将保持打开状态。我从您在调用cmd时使用了/K开关推断出这一点

我的解决方案是使用tkinter创建的窗口来保存输出,并使用subprocess.Popen使用参数shell=True运行命令。您可以指定额外的参数stdout=PIPE来读取命令的输出,并将其引导到tkinter窗口。如何真正做到这一点是一个挑战。

我以前没有做过tkinter编程,也许有更多经验的人可以找到更直接的方法。在我看来,窗口需要在主线程中创建和写入。为此,对于将要执行的每个命令,都将创建一个窗口(Tk的一个特殊子类,称为CmdWindow(,并与窗口命令配对。命令和输出窗口号将与queue.Queue的实例一起传递给工作函数run_command。然后,run_command将使用subprocess.Popen来执行命令,对于从输出管道读取的每一行输出,它将向队列写入一个元组,其中包含窗口号和要写入的行的值。主线程在循环中读取这些元组并将行写入适当的窗口。因为主线程被写命令输出占用,所以使用一个特殊的线程来创建线程池,提交所有需要运行的命令并等待它们完成。当所有任务都完成时;结束";记录被添加到队列中,表示主线程可以停止从队列中读取。此时主线程显示"暂停以终止…"消息,并且在用户在控制台输入回车之前不会终止。

from concurrent.futures import ThreadPoolExecutor, as_completed
from subprocess import Popen, PIPE
from tkinter import *
from tkinter.scrolledtext import ScrolledText
from queue import Queue
from threading import Thread
class CmdWindow(Tk):
""" A console window """
def __init__(self, cmd):
super().__init__()
self.title(cmd)
self.configure(background="#BAD0EF")
title = Entry(self, relief=FLAT, bg="#BAD0EF", bd=0)
title.pack(side=TOP)
textArea = ScrolledText(self, height=24, width=120, bg="#FFFFFF", font=('consolas', '14'))
textArea.pack(expand=True, fill='both')
textArea.bind("<Key>", lambda e: "break") # read only
self._textArea = textArea
def write(self, s):
""" write the next line of output """
self._textArea.insert(END, s)
self.update()
def run_command(q, cmd, win):
""" run command cmd with output window win """
# special "create window" command:
q.put((win, None)) # create the window
with Popen(cmd, stdout=PIPE, shell=True, text=True) as proc:
for line in iter(proc.stdout.readline, ''):
# write line command:
q.put((win, line))
def run_tasks(q, arguments):
# we only need a thread pool since each command will be its own process:
with ThreadPoolExecutor(max_workers=3) as executor:
futures = []
for win, cmd in arguments:
futures.append(executor.submit(run_command, q, cmd, win))
# each task doesn't currently return anything
results = [future.result() for future in as_completed(futures)]
q.put(None) # signify end
def main():
q = Queue()
# sample commands to execute (under Windows):
cmds = ['dir *.py', 'dir *.html', 'dir *.txt', 'dir *.js', 'dir *.csv']
# each command will get its own window for output:
windows = list(cmds)
# pair a command with a window number:
arguments = enumerate(cmds)
# create the thread for running the commands:
thread = Thread(target=run_tasks, args=(q, arguments))
# start the thread:
thread.start()
# wait for command output in main thread
# output must be written from main thread
while True:
t = q.get() # get next tuple or special "end" record
if t is None: # special end record?
break # yes!
# unpack tuple:
win, line = t
if line is None: # special create window command
# use cmd as title and replace with actual window:
windows[win] = CmdWindow(windows[win])
else:
windows[win].write(line)
thread.join() # wait for run_jobs thread to end
input('Pausing for termination...') # wait for user to be finished looking at windows
if __name__ == '__main__':
main()

相关内容

  • 没有找到相关文章

最新更新