在 Python 的concurrent.futures
标准模块中,为什么ProcessPoolExecutor
中正在运行的调用数是max_workers + 1
而不是像ThreadPoolExecutor
中那样max_workers
?仅当提交的调用数严格大于池工作进程数时,才会发生这种情况。
以下 Python 代码片段在ProcessPoolExecutor
中向 2 个工作线程提交 8 次调用:
import concurrent.futures
import time
def call():
while True:
time.sleep(1)
if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(call) for _ in range(8)]
time.sleep(5)
for future in futures:
print(future.running())
打印以下内容(3 个正在运行的调用;由于有 2 个工作线程,因此意外):
真真
真 假
使用ThreadPoolExecutor
打印此内容时(2 个正在运行的调用;预期):
真
假
假 假
好吧,我不会太相信这种running()
方法。似乎它并没有真正反映实际的运行状态。
确保进程状态的最佳方法是使它们打印/更新某些内容。我选择使用multiprocessing.Manager().dict()
对象创建共享字典。
可以从任何进程安全地查阅/更新此进程同步对象,并且即使在多处理环境中也具有共享状态。
每次启动进程时,使用 PID 作为键并将True
作为值来更新共享字典。在退出时设置False
。
import concurrent.futures
import multiprocessing
import time,os
def call(shared_dict):
shared_dict[os.getpid()] = True
print("start",shared_dict)
time.sleep(10)
shared_dict[os.getpid()] = False
print("end",shared_dict)
if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
shared_dict = multiprocessing.Manager().dict()
futures = [executor.submit(call,shared_dict) for _ in range(8)]
time.sleep(5)
for future in futures:
print(future.running())
这是我得到的输出:
start {3076: True}
start {9968: True, 3076: True}
True
True
True
True
True
False
False
False
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
end {9968: False, 3076: False}
如您所见,我有 5 个正在运行的进程。而我的字典清楚地表明
- 同时运行的进程不超过 2 个
- 进程在开始时只创建一次,然后重用于执行进一步的调用(毕竟它是一个池)
让我们检查一下非常简约的文档:
running() 如果调用当前正在执行且无法取消,则返回 True。
它似乎反映了与取消Future
对象未来执行的可能性相关的状态(因为它尚未正确初始化/连接到通信队列,并且仍然是时候取消它了),而不是进程本身的实际"正在运行"状态。
这可能就是源代码中这个注释在定义set_running_or_notify_cancel
的含义:
将未来标记为正在运行或处理任何取消通知。
如果未来已被取消(cancel() 被调用并返回 True),则任何等待未来完成的线程(尽管调用 as_completed() 或 wait()))都会收到通知并返回 False。
如果 future 没有被取消,那么它就会处于运行状态(未来对 running() 的调用将返回 True),并且返回 True。
再一次,我们了解到,最好要求子流程协作,发布其状态,而不是试图使用记录不明确的方法来勒索它。