如何在循环运行时运行协程并等待它由同步函数产生?



我有一个类似傻瓜的代码:

def render():
loop = asyncio.get_event_loop()
async def test():
await asyncio.sleep(2)
print("hi")
return 200
if loop.is_running():
result = asyncio.ensure_future(test())
else:
result = loop.run_until_complete(test())

loop不运行时非常简单,只需使用loop.run_until_complete它就会返回 coro 结果,但如果循环已经在运行(我的阻止代码在已经在运行循环的应用程序中运行),我无法使用loop.run_until_complete因为它会引发异常; 当我调用asyncio.ensure_future任务时,任务被安排并运行, 但我想在那里等待结果,有人知道如何做到这一点吗?文档不太清楚如何执行此操作。

我尝试在 coro 中传递一个concurrent.futures.Future调用set_result,然后在我的阻止代码上调用Future.result(),但它不起作用,它在那里阻塞并且不让其他任何东西运行。我的帮助将不胜感激。

要使用建议的设计实现runner,您需要一种方法来从其中运行的回调单步执行事件循环。Asyncio 明确禁止递归事件循环,因此这种方法是死胡同。

鉴于该约束,您有两个选择:

  1. 使render()本身成为协程;
  2. 在与运行 Asyncio 事件循环的线程不同的线程中执行render()(及其调用方)。

假设 #1 是不可能的,你可以像这样实现render()的 #2 变体:

def render():
loop = _event_loop  # can't call get_event_loop()
async def test():
await asyncio.sleep(2)
print("hi")
return 200
future = asyncio.run_coroutine_threadsafe(test(), loop)
result = future.result()

请注意,不能在render中使用asyncio.get_event_loop(),因为没有(也不应该)为该线程设置事件循环。相反,生成运行器线程的代码必须调用asyncio.get_event_loop()并将其发送到线程,或者只是将其保留在全局变量或共享结构中。

同步等待异步协程

如果 asyncio 事件循环已经通过调用loop.run_forever运行,它将阻塞正在执行的线程,直到调用loop.stop[参见文档]。因此,同步等待的唯一方法是在专用线程上运行事件循环,在循环上调度异步函数,然后从另一个线程同步等待它。

为此,我根据user4815162342的答案编写了自己的最小解决方案。我还添加了用于在所有工作完成后清理循环的部件 [见loop.close]。

下面代码中的main函数在专用线程上运行事件循环,在事件循环上计划多个任务,以及要同步等待其结果的任务。同步等待将阻塞,直到所需结果准备就绪。最后,闭合循环并与其线程一起优雅地清理。

专用线程和函数stop_looprun_forever_safeawait_sync可以封装在模块或类中。

有关线程安全注意事项,请参阅 asyncio 文档中的"并发和多线程"部分。

import asyncio
import threading
#----------------------------------------
def stop_loop(loop):
''' stops an event loop '''
loop.stop()
print (".: LOOP STOPPED:", loop.is_running())
def run_forever_safe(loop):
''' run a loop for ever and clean up after being stopped '''
loop.run_forever()
# NOTE: loop.run_forever returns after calling loop.stop
#-- cancell all tasks and close the loop gracefully
print(".: CLOSING LOOP...")
# source: <https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.html>
loop_tasks_all = asyncio.Task.all_tasks(loop=loop)
for task in loop_tasks_all: task.cancel()
# NOTE: `cancel` does not guarantee that the Task will be cancelled
for task in loop_tasks_all:
if not (task.done() or task.cancelled()):
try:
# wait for task cancellations
loop.run_until_complete(task)
except asyncio.CancelledError: pass
#END for
print(".: ALL TASKS CANCELLED.")
loop.close()
print(".: LOOP CLOSED:", loop.is_closed())
def await_sync(task):
''' synchronously waits for a task '''
while not task.done(): pass
print(".: AWAITED TASK DONE")
return task.result()
#----------------------------------------
async def asyncTask(loop, k):
''' asynchronous task '''
print("--start async task %s" % k)
await asyncio.sleep(3, loop=loop)
print("--end async task %s." % k)
key = "KEY#%s" % k
return key
def main():
loop = asyncio.new_event_loop() # construct a new event loop
#-- closures for running and stopping the event-loop
run_loop_forever = lambda: run_forever_safe(loop)
close_loop_safe = lambda: loop.call_soon_threadsafe(stop_loop, loop)
#-- make dedicated thread for running the event loop
thread = threading.Thread(target=run_loop_forever)
#-- add some tasks along with my particular task
myTask = asyncio.run_coroutine_threadsafe(asyncTask(loop, 100200300), loop=loop)
otherTasks = [asyncio.run_coroutine_threadsafe(asyncTask(loop, i), loop=loop)
for i in range(1, 10)]
#-- begin the thread to run the event-loop
print(".: EVENT-LOOP THREAD START")
thread.start()
#-- _synchronously_ wait for the result of my task
result = await_sync(myTask) # blocks until task is done
print("* final result of my task:", result) 
#... do lots of work ...
print("*** ALL WORK DONE ***")
#========================================
# close the loop gracefully when everything is finished
close_loop_safe()
thread.join()
#----------------------------------------
main()

这是我的情况,我的整个程序都是异步的,但是调用一些同步库,然后回调到我的异步函数。

按照用户4815162342的答案进行操作。

import asyncio
async def asyncTask(k):
''' asynchronous task '''
print("--start async task %s" % k)
# await asyncio.sleep(3, loop=loop)
await asyncio.sleep(3)
print("--end async task %s." % k)
key = "KEY#%s" % k
return key

def my_callback():
print("here i want to call my async func!")
future = asyncio.run_coroutine_threadsafe(asyncTask(1), LOOP)
return future.result()
def sync_third_lib(cb):
print("here will call back to your code...")
cb()
async def main():
print("main start...")
print("call sync third lib ...")
await asyncio.to_thread(sync_third_lib, my_callback)
# await loop.run_in_executor(None, func=sync_third_lib)
print("another work...keep async...")
await asyncio.sleep(2)
print("done!")

LOOP = asyncio.get_event_loop()
LOOP.run_until_complete(main())

最新更新