为什么 multiprocessing.pool.Threadpool 的 terminate() 挂起



我想停止异步多处理作业与KeyboardInterrupt .但有时呼叫终止时会发生挂起。

from multiprocessing.pool import ThreadPool
import multiprocessing
import time
import queue
import inspect

def worker(index):
    print('{}: start'.format(index))
    for i in range(5):
        time.sleep(1)
    print('{}: stop'.format(index))
    return index, True

def wrapper(index, stopEvent, qResult):
    if stopEvent.is_set() is True:
        return index, False
    try:
        result = worker(index)
    except:
        print('*' * 50)
        return index, False
    else:
        if result[1] == True:
            qResult.put(result)
    return result

def watcher(qResult, stopEvent):
    cntQ = 0
    while True:
        try:
            result = qResult.get(timeout=10)
            qResult.task_done()
        except queue.Empty:
            if stopEvent.is_set() is True:
                break
        except KeyboardInterrupt:
            stopEvent.set()
        else:
            cntQ += 1
            print(result)
    qResult.join()
    qResult.close()
    print('qResult count:', cntQ)

def main():
    stopEvent = multiprocessing.Event()
    qResult = multiprocessing.JoinableQueue()
    qResult.cancel_join_thread()
    watch = multiprocessing.Process(target=watcher, args=(qResult, stopEvent))
    watch.start()
    pool = ThreadPool()
    lsRet = []
    for i in range(100000):
        try:
            ret = pool.apply_async(wrapper, args=(i, stopEvent, qResult))
            lsRet.append(ret)
        except KeyboardInterrupt:
            stopEvent.set()
            time.sleep(1)
            break
        if i+1 % 10 == 0:
            time.sleep(2)
    cntTotal = len(lsRet)
    cntRet = 0
    for ret in lsRet:
        if stopEvent.is_set():
            break
        try:
            ret.get()
        except KeyboardInterrupt:
            stopEvent.set()
            time.sleep(1)
        else:
            cntRet += 1
    if stopEvent.is_set() is False:
        stopEvent.set()
    print(inspect.stack()[0][1:4])
    if watch.is_alive() is True:
        watch.join()
    print(inspect.stack()[0][1:4])
    pool.terminate()                        # Why hang??????????
    print(inspect.stack()[0][1:4])
    pool.join()
    print(cntTotal, cntRet)

if __name__ == '__main__':
    main()

main()使用 multiprocessing.pool.Threadpool 异步调用一个watcher()线程和许多wrapper()线程。

wrapper()调用worker()并将其结果放入队列。

watcher()监视结果队列上方。

如果按 ctrl-c,则设置停止事件。

设置stopEvent后,wrapper()停止调用worker()Watcher()指示queue.EmptystopEvent并退出循环。

最后main()调用terminate()池。

有时

流程做得很好,但有时挂起。每次都不一样。

你应该把代码放在 try 除了块并捕获内置异常 键盘中断 请参阅此处的示例 捕获键盘中断

相关内容

  • 没有找到相关文章

最新更新