在响应后仍完成任务时,对部分完成的Python async事件循环打印响应



我正在使用Sanic,但我有点卡住。我正在呼吁3个不同的API每个都有自己的响应时间。

我想创建一个超时功能,为每个任务返回提供可接受的时间。但是,如果时间任务在可接受的时间内无法完成,我想返回部分数据,因为我不需要完整的数据集,而速度则更多是重点。

但是,我想保持未完成的任务工作直到完成(即请求API数据并插入Postgres db。

我想知道我们是否可以在不使用某种调度程序的情况下执行此操作,以使任务在后台运行。

但是,如果时间任务在可接受的时间内没有完成 返回部分数据,因为我不需要完整的数据集和速度

更重点。

但是,我想保持未完成的任务直到完成

因此,其他任务独立于超时任务的状态,对吗?如果我正确理解您,您只想用自己的超时运行3 asyncio.Task并在结尾汇总结果。

我看到的唯一可能的问题是"想要返回部分数据",因为它的组织方式可能非常有所不同,但是我们可能只能通过此"部分数据"而在超时的内部任务中提出的例外。

这是小原型:

import asyncio

class PartialData(Exception):
    def __init__(self, data):
        super().__init__()
        self.data = data        

async def api_job(i):
    data = 'job {i}:'.format(i=i)
    try:
        await asyncio.sleep(1)
        data += ' step 1,'
        await asyncio.sleep(2)
        data += ' step 2,'
        await asyncio.sleep(2)
        data += ' step 3.'
    except asyncio.CancelledError as exc:
        raise PartialData(data)  # Pass partial data to outer code with our exception.
    else:
        return data

async def api_task(i, timeout):
    """Wrapper for api_job to run it with timeout and retrieve it's partial data on timeout."""
    t = asyncio.ensure_future(api_job(i))
    try:
        await asyncio.wait_for(t, timeout)
    except asyncio.TimeoutError:
        try:
            await t
        except PartialData as exc:
            return exc.data  # retrieve partial data on timeout and return it.
    else:
        return t.result()

async def main():
    # Run 3 jobs with different timeouts:
    results = await asyncio.gather(
        api_task(1, timeout=2), 
        api_task(2, timeout=4), 
        api_task(3, timeout=6),
    )
    # Print results including "partial data":
    for res in results:
        print(res)

if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

输出:

job 1: step 1,
job 2: step 1, step 2,
job 3: step 1, step 2, step 3.

(正如您可以看到的前两个作业,并在其数据的一部分中完成)

upd:

复杂的示例包含对不同事件的可能解决方案:

import asyncio
from contextlib import suppress

async def stock1(_):
    await asyncio.sleep(1)
    return 'stock1 res'
async def stock2(exception_in_2):
    await asyncio.sleep(1)
    if exception_in_2:
        raise ValueError('Exception in stock2!')
    await asyncio.sleep(1)
    return 'stock2 res'
async def stock3(_):
    await asyncio.sleep(3)
    return 'stock3 res'

async def main():
    # Vary this values to see different situations:
    timeout = 2.5
    exception_in_2 = False

    # To run all three stocks just create tasks for them:
    tasks = [
        asyncio.ensure_future(s(exception_in_2)) 
        for s 
        in (stock1, stock2, stock3)
    ]

    # Now we just wait until one of this possible situations happened:
    # 1) Everything done
    # 2) Exception occured in one of tasks
    # 3) Timeout occured and at least two tasks ready
    # 4) Timeout occured and less than two tasks ready
    # ( https://docs.python.org/3/library/asyncio-task.html#asyncio.wait )
    await asyncio.wait(
        tasks, 
        timeout=timeout, 
        return_when=asyncio.FIRST_EXCEPTION
    )
    is_success = all(t.done() and not t.exception() for t in tasks)
    is_exception = any(t.done() and t.exception() for t in tasks)
    is_good_timeout = 
        not is_success and 
        not is_exception and 
        sum(t.done() for t in tasks) >= 2
    is_bad_timeout = 
        not is_success and 
        not is_exception and 
        sum(t.done() for t in tasks) < 2

    # If success, just print all results:
    if is_success:
        print('All done before timeout:')
        for t in tasks:
            print(t.result())
    # If timeout, but at least two done,
    # print it leaving pending task to be executing.
    # But note two important things:
    # 1) You should guarantee pending task done before loop closed
    # 2) What if pending task will finish with error, is it ok?
    elif is_good_timeout:
        print('Timeout, but enought tasks done:')
        for t in tasks:
            if t.done():
                print(t.result())
    # Timeout and not enought tasks done,
    # let's just cancel all hanging:    
    elif is_bad_timeout:
        await cancel_and_retrieve(tasks)
        raise RuntimeError('Timeout and not enought tasks done')  # You probably want indicate fail
    # If any of tasks is finished with an exception,
    # we should probably cancel unfinished tasks,
    # await all tasks done and retrive all exceptions to prevent warnings
    # ( https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed )
    elif is_exception:
        await cancel_and_retrieve(tasks)
        raise RuntimeError('Exception in one of tasks')  # You probably want indicate fail

async def cancel_and_retrieve(tasks):
    """
    Cancel all pending tasks, retrieve all exceptions
    ( https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed )
    It's cleanup function if we don't want task being continued.
    """
    for t in tasks:
        if not t.done():
            t.cancel()
    await asyncio.wait(
        tasks, 
        return_when=asyncio.ALL_COMPLETED
    )
    for t in tasks:
        with suppress(Exception):
            await t

if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        # If some tasks still pending (is_good_timeout case), 
        # let's kill them:
        loop.run_until_complete(
            cancel_and_retrieve(asyncio.Task.all_tasks())
        )
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

最新更新