如何使用带有超时的concurrent.futures



我正试图使用concurrent.futures模块在python3.2中获得超时。然而,当它超时时,并不会真正停止执行。我对线程和进程池执行器都进行了尝试,它们都没有停止任务,只有在任务完成后才会引发超时。那么有人知道这是否可行吗?

import concurrent.futures
import time
import datetime
max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]
def run_loop(max_number):
    print("Started:", datetime.datetime.now(), max_number)
    last_number = 0;
    for i in range(1, max_number + 1):
        last_number = i * i
    return last_number
def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
        try:
            for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                print(future.result(timeout=1))
        except concurrent.futures._base.TimeoutError:
            print("This took to long...")
if __name__ == '__main__':
    main()

据我所知,TimeoutError实际上是在您期望的时候引发的,而不是在任务完成之后。

但是,程序本身将继续运行,直到所有正在运行的任务都完成为止。这是因为当前正在执行的任务(在您的情况下,可能是您提交的所有任务,因为您的池大小等于任务数量)实际上并没有被"杀死"。

会引发TimeoutError,因此您可以选择不等待任务完成(而是执行其他操作),但任务将继续运行直到完成。只要执行器的线程/子进程中有未完成的任务,python就不会退出。

据我所知,不可能仅仅"停止"当前正在执行的Futures,只能"取消"尚未启动的计划任务。在您的情况下,不会有任何线程,但假设您有5个线程/进程的池,并且您想要处理100个项目。在某个时刻,可能有20个已完成的任务、5个正在运行的任务和75个已计划的任务。在这种情况下,您可以取消这76个计划任务,但无论您是否等待结果,正在运行的4个任务都将持续到完成。

即使不能这样做,我想应该有办法达到你想要的最终结果。也许这个版本可以在路上帮助你(不确定它是否完全符合你的要求,但它可能有一些用处):

import concurrent.futures
import time
import datetime
max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]
class Task:
    def __init__(self, max_number):
        self.max_number = max_number
        self.interrupt_requested = False
    def __call__(self):
        print("Started:", datetime.datetime.now(), self.max_number)
        last_number = 0;
        for i in xrange(1, self.max_number + 1):
            if self.interrupt_requested:
                print("Interrupted at", i)
                break
            last_number = i * i
        print("Reached the end")
        return last_number
    def interrupt(self):
        self.interrupt_requested = True
def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor:
        tasks = [Task(num) for num in max_numbers]
        for task, future in [(i, executor.submit(i)) for i in tasks]:
            try:
                print(future.result(timeout=1))
            except concurrent.futures.TimeoutError:
                print("this took too long...")
                task.interrupt()

if __name__ == '__main__':
    main()

通过为每个"任务"创建一个可调用对象,并将这些对象提供给执行器,而不仅仅是一个简单的函数,您可以提供一种"中断"任务的方法。提示:去掉task.interrupt()行,看看会发生什么,这可能会让我更容易理解上面的长解释;-)

最近我也遇到了这个问题,最后我使用ProcessPoolExecutor:提出了以下解决方案


def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
        try:
            for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                print(future.result(timeout=1))
        except concurrent.futures._base.TimeoutError:
            print("This took to long...")
            stop_process_pool(executor)
def stop_process_pool(executor):
    for pid, process in executor._processes.items():
        process.terminate()
    executor.shutdown()

我的建议是使用ThreadPool而不是concurrent.futures。正如医生所说:

所有排入ThreadPoolExecutor队列的线程都将在解释器可以退出。请注意,执行此操作的出口处理程序是使用atexit添加的任何退出处理程序之前执行。这意味着必须捕获并处理主线程中的异常,才能向线程发出正常退出的信号。

在更复杂的情况下,整个程序会陷入困境。下面的片段已经足够表达我的意思了,尽管偏离了这个问题一点:

import concurrent.futures, time, datetime
from multiprocessing.pool import ThreadPool
max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]
def run_loop(max_number):
    print("Started:", datetime.datetime.now(), max_number)
    last_number = 0
    i = 0
    while True:
        last_number = i * i
        i += 1
    return last_number
def origin():
    try:
        with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:
            try:
                for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):
                    print(future.result(timeout=1))
            except concurrent.futures._base.TimeoutError:
                print("This took to long...") # It suspends infinitely.
    except:
        print('Ending from origin.')
def update():
    try:
        with ThreadPool(len(max_numbers)) as pool:
            result = pool.map_async(run_loop, max_numbers)
            for num in result.get(2):
                print(num)
    except:
        print('Ending from update.')
if __name__ == '__main__':
    origin()
    # update()

相关内容

  • 没有找到相关文章

最新更新