Python等待多处理池中的过程完成,而无需关闭池或使用MAP()



我有一个像

的代码作品
pool = multiprocessing.Pool(10)
for i in range(300):
    for m in range(500):
        data = do_some_calculation(resource)
        pool.apply_async(paralized_func, data, call_back=update_resource)
    # need to wait for all processes finish
    # {...}
    # Summarize resource
    do_something_with_resource(resource)

基本上我有2个循环。我在循环外一次启动处理池以避免过热。在第二循环结束时,我想总结所有过程的结果。

问题是,由于data输入的变化,我无法使用pool.map()等待。我不能使用pool.join()pool.close(),因为我仍然需要在第1循环的下一个迭代中使用pool

在这种情况下,等待进程完成的好方法是什么?

我尝试检查pool._cache在第二循环结束时。

while len(process_pool._cache) > 0:
    sleep(0.001)

以这种方式有效,但看起来很奇怪。有更好的方法吗?

apply_async将返回 AsyncResult对象。此对象具有方法wait([timeout]),您可以使用它。

示例:

pool = multiprocessing.Pool(10)
for i in range(300):
    results = []
    for m in range(500):
        data = do_some_calculation(resource)
        result = pool.apply_async(paralized_func, data, call_back=update_resource)
        results.append(result)
    [result.wait() for result in results]
    # need to wait for all processes finish
    # {...}
    # Summarize resource
    do_something_with_resource(resource)

我尚未检查此代码,因为它不能执行,但应该起作用。

有一个问题的问题

[result.wait() for result in results]

如果某些工人提出例外,则不会用作障碍。例外认为足够的案例可以进一步进行等待((。这可能检查所有工人是否完成处理。

while True:
    time.sleep(1)
    # catch exception if results are not ready yet
    try:
        ready = [result.ready() for result in results]
        successful = [result.successful() for result in results]
    except Exception:
        continue
    # exit loop if all tasks returned success
    if all(successful):
        break
    # raise exception reporting exceptions received from workers
    if all(ready) and not all(successful):
        raise Exception(f'Workers raised following exceptions {[result._value for result in results if not result.successful()]}')

,或者您可以使用回调记录您获得了多少返回。

pool = multiprocessing.Pool(10)
for i in range(300):
    results = 0
    for m in range(500):
        data = do_some_calculation(resource)
        result = pool.apply_async(paralized_func, data, call_back=lambda x: results+=1; )
        results.append(result)
    
    # need to wait for all processes finish
    while results < 500:
        pass
    # Summarize resource
    do_something_with_resource(resource)

相关内容

  • 没有找到相关文章

最新更新