python多处理.池终止*特定*长时间运行或挂起的进程



我需要执行一个由许多并行数据库连接和查询组成的池。我想使用多处理。Pool或concurrent.futures ProcessPoolExecutor。Python 2.7.5

在某些情况下,查询请求花费的时间太长或永远不会完成(挂起/僵尸进程)。我想从多处理中终止特定于进程。Pool或concurrent.futures ProcessPoolExecutor已超时。

这里有一个如何杀死/重新生成整个进程池的例子,但理想情况下,我会尽量减少CPU抖动,因为我只想杀死一个在超时秒后没有返回数据的特定长时间运行的进程。

由于某些原因,在返回并完成所有结果后,下面的代码似乎无法终止/加入进程池。当超时发生时,这可能与终止工作进程有关,但是当终止工作进程时,池会创建新的工作进程,并且结果与预期一致。

from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys
def f(x):
    time.sleep(x)
    return x
if __name__ == '__main__':
    pool = Pool(processes=4, maxtasksperchild=4)
    results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]
    while results:
        try:
            x, result = results.pop(0)
            start = time.time()
            print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)
        except Exception as e:
            print str(e)
            print '%d Timeout Exception! in %f' % (x, time.time()-start)
            for p in pool._pool:
                if p.exitcode is None:
                    p.terminate()
    pool.terminate()
    pool.join()

我没有完全理解你的问题。你说你想停止一个特定的进程,但在异常处理阶段,你正在对所有作业调用terminate。不知道你为什么这么做。此外,我确信使用multiprocessing.Pool中的内部变量是不太安全的。说了这么多,我想你的问题是,为什么这个节目在暂停时没有结束。如果这就是问题所在,那么下面的方法就能奏效:

from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys
def f(x):
    time.sleep(x)
    return x
if __name__ == '__main__':
    pool = Pool(processes=4, maxtasksperchild=4)
    results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]
    result = None
    start = time.time()
    while results:
        try:
            x, result = results.pop(0)
            print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)
        except Exception as e:
            print str(e)
            print '%d Timeout Exception! in %f' % (x, time.time()-start)
            for i in reversed(range(len(pool._pool))):
                p = pool._pool[i]
                if p.exitcode is None:
                    p.terminate()
                del pool._pool[i]
    pool.terminate()
    pool.join()

重点是您需要从池中删除项目;仅仅调用terminate是不够的。

在您的解决方案中,您正在篡改池本身的内部变量。池依赖于3个不同的线程才能正确操作,在不知道自己在做什么的情况下干预它们的内部变量是不安全的。

在标准Python池中,没有一种干净的方法可以停止超时进程,但也有一些替代实现可以公开这种功能。

您可以查看以下库:

卵石

台球

为了避免访问内部变量,可以将执行任务中的multiprocessing.current_process().pid保存到共享内存中。然后对来自主进程的multiprocessing.active_children()进行迭代,并杀死目标pid(如果存在)
但是,在外部终止工作者之后,会重新创建他们,但池变得不可加入,并且还需要在join() 之前显式终止

我也遇到了这个问题。

原始代码和@stacksia编辑的版本有相同的问题:在这两种情况下,当其中一个进程达到超时时(即pool._pool上的循环完成时),它将杀死所有当前运行的进程。

在下面查找我的解决方案。它包括按照@luart的建议为每个工作进程创建一个.pid文件。如果有一种方法可以标记每个工作进程,它就会起作用(在下面的代码中,x完成了这项工作)。如果有人有更优雅的解决方案(例如将PID保存在内存中),请分享。

#!/usr/bin/env python
from multiprocessing import Pool
import time, os
import subprocess
def f(x):
    PID = os.getpid()
    print 'Started:', x, 'PID=', PID
    pidfile = "/tmp/PoolWorker_"+str(x)+".pid"
    if os.path.isfile(pidfile):
        print "%s already exists, exiting" % pidfile
        sys.exit()
    file(pidfile, 'w').write(str(PID))
    # Do the work here
    time.sleep(x*x)
    # Delete the PID file
    os.remove(pidfile)
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=3, maxtasksperchild=4)
    results = [(x, pool.apply_async(f, (x,))) for x in [1,2,3,4,5,6]]
    pool.close()
    while results:
        print results
        try:
            x, result = results.pop(0)
            start = time.time()
            print result.get(timeout=3), '%d done in %f Seconds!' % (x, time.time()-start)
        except Exception as e:
            print str(e)
            print '%d Timeout Exception! in %f' % (x, time.time()-start)
            # We know which process gave us an exception: it is "x", so let's kill it!
            # First, let's get the PID of that process:
            pidfile = '/tmp/PoolWorker_'+str(x)+'.pid'
            PID = None
            if os.path.isfile(pidfile):
                PID = str(open(pidfile).read())
                print x, 'pidfile=',pidfile, 'PID=', PID
            # Now, let's check if there is indeed such process runing:
            for p in pool._pool:
                print p, p.pid
                if str(p.pid)==PID:
                    print 'Found  it still running!', p, p.pid, p.is_alive(), p.exitcode
                    # We can also double-check how long it's been running with system 'ps' command:"
                    tt = str(subprocess.check_output('ps -p "'+str(p.pid)+'" o etimes=', shell=True)).strip()
                    print 'Run time from OS (may be way off the real time..) = ', tt
                    # Now, KILL the m*$@r:
                    p.terminate()
                    pool._pool.remove(p)
                    pool._repopulate_pool()
                    # Let's not forget to remove the pidfile
                    os.remove(pidfile)
                    break
    pool.terminate()
    pool.join()

许多人建议用卵石。它看起来不错,但仅适用于Python 3。如果有人能为python 2.6导入pebble,那就太好了。

相关内容

  • 没有找到相关文章

最新更新