如何使多处理池不启动新进程,但也不终止当前运行的进程?



我在Python 2.7中使用Pythonmultiprocessing.Pool类。我有大量的工作只能在一天中的特定时间段运行。每一项工作都需要一些时间。我想限制作业一次最多并行运行n个。

Pool功能可以很好地限制并行作业的数量,但是当我试图减少作业时,它似乎有问题。当我在窗口结束时,我希望当前正在运行的作业完成它们的处理。我不希望有新的工作开始。我一直在尝试使用Pool.close()来做到这一点,这确实让我正在运行的进程按预期完成,但从实验中似乎可以看出,即使在池关闭后,队列中但尚未开始处理的作业仍将提交处理。

另一个选项,Pool.terminate()甚至主动关闭正在运行的作业,这违背了期望的行为。

tbody><<

首先,你不应该使用Python2.7,它已经被弃用一段时间了。

您应该使用concurrent.futures标准库中的ProcessPoolExecutor,并调用激活cancel_futures标志的.shutdown()方法,以让执行器完成已启动的作业,但取消任何挂起的工作。

from concurrent.futures import ProcessPoolExecutor
parallel_jobs = 4  # The pool size
executor = ProcessPoolExecutor(parallel_jobs)
future_1 = executor.submit(work_1, argument_1)
...
future_n = executor.submit(work_n, argument_n)
...
# At some point when the time window ends and you need to stop jobs:
executor.shutdown(cancel_futures=True)
# Some futures such as future_n may have  been cancelled here, you can check that:
if future_n.cancelled():
print("job n has been cancelled")
# Or you can try to get the result while checking for CancelledError:
try:
result_n = future_n.result()
except CancelledError:
print("job n hasn't finished in the given time window")

下面是一个取消的例子:

from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from time import sleep
# The job to execute concurrently
def foo(i: int) -> str:
sleep(0.2)
print(i)
return f"{i}"
e = ThreadPoolExecutor(4)
# Jobs are scheduled concurrently, this call does not block
futures = [e.submit(foo, i) for i in range(100)]
# Shutdown during execution and cancel pending jobs
e.shutdown(cancel_futures=True)
# Gather completed results
results = [f.result() for f in futures if not f.cancelled()]
print(results)

如果您执行这段代码,您将看到100个计划的作业并没有全部完成,只有一些是由于执行器在中间关闭而完成的。

相关内容

  • 没有找到相关文章

最新更新



  • All rights reserved © 2023 www.xiaobeizi.cn

  • 首页
Function允许正在运行的作业完成阻止新作业开始
.terminate ()没有
.close ()没有
所期望的行为/td>