"Fire and forget" python async/await



有时需要进行一些非关键的异步操作,但我不想等待它完成。在Tornado的协程实现中,只需提交yield关键字,就可以"启动并忘记"异步函数。

我一直在试图弄清楚如何使用Python3.5中发布的新async/await语法来"fire&forget"。例如,一个简化的代码片段:

async def async_foo():
    print("Do some stuff asynchronously here...")
def bar():
    async_foo()  # fire and forget "async_foo()"
bar()

然而,bar()从未执行,相反,我们得到了一个运行时警告:

RuntimeWarning: coroutine 'async_foo' was never awaited
  async_foo()  # fire and forget "async_foo()"

Upd:

如果您使用的是Python>=,请将asyncio.ensure_future替换为asyncio.create_task3.7这是一种更新、更好的生成任务的方式。


asyncio.Task到";火与遗忘

根据asyncio.Task的python文档,可以启动一些协同程序来执行";在背景中"。asyncio.ensure_future创建的任务不会阻止执行(因此函数将立即返回!)。这看起来像是一种";火与遗忘;按照您的要求。

import asyncio

async def async_foo():
    print("async_foo started")
    await asyncio.sleep(1)
    print("async_foo done")

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()
    # btw, you can also create tasks inside non-async funcs
    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

Do some actions 1
async_foo started
Do some actions 2
async_foo done
Do some actions 3

如果任务在事件循环完成后执行,该怎么办

请注意,asyncio期望任务在事件循环完成时完成。因此,如果您将main()更改为:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget
    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')

程序完成后,您将收到以下警告:

Task was destroyed but it is pending!
task: <Task pending coro=<async_foo() running at [...]

为了防止这种情况,您可以在事件循环完成后等待所有挂起的任务:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget
    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    
    # Let's also finish all running tasks:
    pending = asyncio.Task.all_tasks()
    loop.run_until_complete(asyncio.gather(*pending))

杀死任务而不是等待任务

有时你不想等待任务完成(例如,有些任务可能会被创建为永远运行)。在这种情况下,您可以只cancel()它们,而不是等待它们:

import asyncio
from contextlib import suppress

async def echo_forever():
    while True:
        print("echo")
        await asyncio.sleep(1)

async def main():
    asyncio.ensure_future(echo_forever())  # fire and forget
    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    # Let's also cancel all running tasks:
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        # Now we should await task to execute it's cancellation.
        # Cancelled task raises asyncio.CancelledError that we can suppress:
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(task)

输出:

Do some actions 1
echo
Do some actions 2
echo
Do some actions 3
echo

输出:

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

这是一个简单的decorator函数,它将执行推到后台,并将控制行移到代码的下一行。

主要优点是,您不必将函数声明为await

import asyncio
import time
def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)
    return wrapped
@fire_and_forget
def foo():
    print("foo() started")
    time.sleep(1)
    print("foo() completed")
print("Hello")
foo()
print("I didn't wait for foo()")

注意:检查我的另一个答案,该答案使用纯thread而不使用asyncio也可以。

这并不完全是异步执行,但run_in_executor()可能适合您。

def fire_and_forget(task, *args, **kwargs):
    loop = asyncio.get_event_loop()
    if callable(task):
        return loop.run_in_executor(None, task, *args, **kwargs)
    else:    
        raise TypeError('Task must be a callable')
def foo():
    #asynchronous stuff here

fire_and_forget(foo)

由于某些原因,如果您无法使用asyncio,那么下面是使用纯线程的实现。检查我的其他答案和Sergey的答案。

import threading, time
def fire_and_forget(f):
    def wrapped():
        threading.Thread(target=f).start()
    return wrapped
@fire_and_forget
def foo():
    print("foo() started")
    time.sleep(1)
    print("foo() completed")
print("Hello")
foo()
print("I didn't wait for foo()")

产生

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed
def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        threading.Thread(target=functools.partial(f, *args, **kwargs)).start()
    return wrapped

是上面的更好版本——不使用异步

最新更新