我有一个用例,我必须处理一些文档,这需要一些时间。所以我尝试对文档进行批处理并对其进行多处理,效果很好,并且按预期在更短的时间内完成。处理文档还有多个阶段,我分别在所有阶段使用了多处理。当我触发多个并发请求来执行处理时,在处理了大约 70+ 个请求后,我注意到一些进程没有被杀死。
我正在用 locust 执行负载测试,我在其中创建了 5 个用户,等待时间为 4 - 5 秒,每个请求大约需要 3.5 秒,所以我尝试了多处理包和其他各种包装器(pebble、并行执行、pathos、concurrent.futures(。
基本上我做的是,
from multiprocessing import Pool
with Pool(processes=5) as p:
out = p.starmap(do_something, args)
p.close()
p.terminate()
此外,官方文件还说,在执行后,游泳池将在执行后关闭,同时这样做with
.当我停止触发请求时,最后一两个请求会停滞不前。我通过在该过程之前和之后打印"已开始{req_num}"和"已服务{req_num}"来找到这一点。在添加p.close()
和p.terminate()
之前,我可以看到停止触发请求后正在运行更多进程。添加它们后,仅不会提供最后一个触发的进程。现在,如果我开始触发请求并在一段时间后再次停止它们,则不会提供相同的最后一两个请求,并且它们的进程停滞不前。所以停滞的过程积累起来。
我提到的每个包装纸都有不同的关闭池的方法。我也试过了。就像悲哀一样,
p = Pool(processes=5)
out = p.map(do_something, args)
p.join()
p.close()
p.terminate()
有了concurrent.future.ThreadPoolExecutor
,它就p.shutdown()
.在所有其他包装器中,我都面临着同样的问题。在这里,停滞过程的数量超过了multiprocessing.Pool
我需要帮助来找到原因或正确的方法。任何帮助将不胜感激!
要正确关闭池,只需调用:
Pool.close() # terminate worker processes when all work already assigned has completed
Pool.join() # wait all processes to terminate