在 Python 中使用 ProcessPoolExecutor 运行的调用次数不正确



在 Python 的concurrent.futures标准模块中,为什么ProcessPoolExecutor中正在运行的调用数是max_workers + 1而不是像ThreadPoolExecutor中那样max_workers?仅当提交的调用数严格大于池工作进程数时,才会发生这种情况。

以下 Python 代码片段在ProcessPoolExecutor中向 2 个工作线程提交 8 次调用:

import concurrent.futures
import time

def call():
while True:
time.sleep(1)

if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(call) for _ in range(8)]
time.sleep(5)
for future in futures:
print(future.running())

打印以下内容(3 个正在运行的调用;由于有 2 个工作线程,因此意外):


真 假





使用ThreadPoolExecutor打印此内容时(2 个正在运行的调用;预期):





假 假


好吧,我不会太相信这种running()方法。似乎它并没有真正反映实际的运行状态。

确保进程状态的最佳方法是使它们打印/更新某些内容。我选择使用multiprocessing.Manager().dict()对象创建共享字典。

可以从任何进程安全地查阅/更新此进程同步对象,并且即使在多处理环境中也具有共享状态。

每次启动进程时,使用 PID 作为键并将True作为值来更新共享字典。在退出时设置False

import concurrent.futures
import multiprocessing
import time,os

def call(shared_dict):
shared_dict[os.getpid()] = True
print("start",shared_dict)
time.sleep(10)
shared_dict[os.getpid()] = False
print("end",shared_dict)

if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
shared_dict = multiprocessing.Manager().dict()
futures = [executor.submit(call,shared_dict) for _ in range(8)]
time.sleep(5)
for future in futures:
print(future.running())

这是我得到的输出:

start {3076: True}
start {9968: True, 3076: True}
True
True
True
True
True
False
False
False
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
end {9968: False, 3076: False}

如您所见,我有 5 个正在运行的进程。而我的字典清楚地表明

  • 同时运行的进程不超过 2 个
  • 进程在开始时只创建一次,然后重用于执行进一步的调用(毕竟它是一个池)

让我们检查一下非常简约的文档:

running() 如果调用当前正在执行且无法取消,则返回 True。

它似乎反映了与取消Future对象未来执行的可能性相关的状态(因为它尚未正确初始化/连接到通信队列,并且仍然是时候取消它了),而不是进程本身的实际"正在运行"状态。

这可能就是源代码中这个注释在定义set_running_or_notify_cancel的含义:

将未来标记为正在运行或处理任何取消通知。

如果未来已被取消(cancel() 被调用并返回 True),则任何等待未来完成的线程(尽管调用 as_completed() 或 wait()))都会收到通知并返回 False。

如果 future 没有被取消,那么它就会处于运行状态(未来对 running() 的调用将返回 True),并且返回 True。

再一次,我们了解到,最好要求子流程协作,发布其状态,而不是试图使用记录不明确的方法来勒索它。

相关内容

  • 没有找到相关文章

最新更新