假设我有一些异步的coroutine,可以获取一些数据并返回它。这样:
async def fetch_data(*args):
result = await some_io()
return result
基本上,此coroutine是从Coroutines链中调用的,并且最初的Coroutine是通过创建任务来运行的。但是,如果出于测试目的,我只想在运行某个文件时仅此方式运行一个coroutine:
if __name__ == '__main__':
result = await fetch_data(*args)
print(result)
显然,我无法做到这一点,因为我试图从非Coroutine功能中奔跑并等待Coroutine。因此,问题是:是否有一些正确的方法可以从Coroutine获取数据而不调用功能?我可以为result
做一些Future
对象并等待它,但是也许还有其他一些更简单,更清晰的方法?
您将需要创建一个事件循环以运行您的coroutine:
import asyncio
async def async_func():
return "hello"
loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_func())
loop.close()
print(result)
或作为函数:
def run_coroutine(f, *args, **kwargs):
loop = asyncio.get_event_loop()
result = loop.run_until_complete(f(*args, **kwargs))
loop.close()
return result
这样使用:
print(run_coroutine(async_func))
或:
assert "expected" == run_coroutine(fetch_data, "param1", param2="foo")
对于Python 3.7 ,您可以使用asyncio.run()
:
import asyncio
async def fetch_data():
return "sample_data"
asyncio.run(fetch_data())