Python多处理模块:使用超时连接进程



我正在对复杂模拟的参数进行优化。我正在使用多处理模块来增强优化算法的性能。我在http://pymotw.com/2/multiprocessing/basics.html.复杂的模拟持续不同的时间,取决于优化算法的给定参数,大约1到5分钟。如果参数选择得非常糟糕,模拟可能会持续30分钟或更长时间,并且结果没有用处。因此,我考虑在多处理中建立一个超时,终止所有持续时间超过定义时间的模拟。以下是问题的抽象版本:

import numpy as np
import time
import multiprocessing
def worker(num):
    
    time.sleep(np.random.random()*20)
def main():
    
    pnum = 10    
    
    procs = []
    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)
        
    for p in procs:
        p.join(5)
        print('stopping', p.name)
     
if __name__ == "__main__":
    main()

p.join(5)行定义了5秒的超时。由于for循环for p in procs:,程序等待5秒直到第一个进程完成,然后再等待5秒,直到第二个进程完成等等,但我希望程序终止所有持续5秒以上的进程。此外,如果没有一个进程的持续时间超过5秒,则程序不得等待这5秒。

您可以通过创建一个等待一些超时秒数的循环来实现这一点,经常检查是否所有进程都已完成。如果它们没有在分配的时间内全部完成,则终止所有进程:

TIMEOUT = 5 
start = time.time()
while time.time() - start <= TIMEOUT:
    if not any(p.is_alive() for p in procs):
        # All the processes are done, break now.
        break
    time.sleep(.1)  # Just to avoid hogging the CPU
else:
    # We only enter this if we didn't 'break' above.
    print("timed out, killing all processes")
    for p in procs:
        p.terminate()
        p.join()

如果您想终止所有进程,您可以从多处理中使用池您需要为所有执行定义一个通用超时,而不是单独的超时。

import numpy as np
import time
from multiprocessing import Pool
def worker(num):
    xtime = np.random.random()*20
    time.sleep(xtime)
    return xtime
def main():
    pnum = 10
    pool = Pool()
    args = range(pnum)
    pool_result = pool.map_async(worker, args)
    # wait 5 minutes for every worker to finish
    pool_result.wait(timeout=300)
    # once the timeout has finished we can try to get the results
    if pool_result.ready():
        print(pool_result.get(timeout=1))
if __name__ == "__main__":
    main()

这将为您提供一个列表,其中按顺序列出了所有员工的返回值
更多信息请点击此处:https://docs.python.org/2/library/multiprocessing.html#module-多访问.pool

感谢dano的帮助,我找到了一个解决方案:

import numpy as np
import time
import multiprocessing
def worker(num):
    time.sleep(np.random.random()*20)
def main():
    pnum = 10    
    TIMEOUT = 5 
    procs = []
    bool_list = [True]*pnum
    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)
    start = time.time()
    while time.time() - start <= TIMEOUT:
        for i in range(pnum):
            bool_list[i] = procs[i].is_alive()
            
        print(bool_list)
            
        if np.any(bool_list):  
            time.sleep(.1)  
        else:
            break
    else:
        print("timed out, killing all processes")
        for p in procs:
            p.terminate()
            
    for p in procs:
        print('stopping', p.name,'=', p.is_alive())
        p.join()
if __name__ == "__main__":
    main()

这不是最优雅的方法,我相信有比使用bool_list更好的方法。超时5秒后仍处于活动状态的进程将被终止。如果您在worker函数中设置的时间比超时时间短,您将看到程序在达到5秒的超时时间之前停止。如果有:)

,我仍然对更优雅的解决方案持开放态度

相关内容

  • 没有找到相关文章

最新更新