当需要实际值时自动等待协程?

  • 本文关键字:等待 python python-asyncio
  • 更新时间 :
  • 英文 :


假设我想使用asyncio实现以下内容:

def f(): 
val1 = a()  # a() takes 1 sec
val2 = b()  # b() takes 3 sec
val3 = c(val1, val2)  # c() takes 1 sec, but must wait for a() and b() to finish
val4 = d(val1)  # d() takes 1 sec, but must wait for a() to finish

所有函数a, b, c, d都是异步的,可以并行运行。运行它的优化方法是:1)并行运行a()和b()。2)当a()完成后,运行d()。3)当a()和b()完成后,运行c()。所有的东西放在一起需要4秒钟。

我发现用asyncio实现它并不理想:

import time
import asyncio
async def a():
await asyncio.sleep(1)
async def b():
await asyncio.sleep(3)
async def c(val1, val2):
await val2
await asyncio.sleep(1)
async def d(val1):
await val1
await asyncio.sleep(1)
async def f():
val1 = a()
val2 = b()
val3 = c(val1, val2)
val4 = d(val1)
return await asyncio.gather(val3, val4)
t1 = time.time()
await f()
t2 = time.time()
print(t2 - t1)  # This will be 4 seconds indeed

上面的实现可以工作,但主要流程是我需要知道a()在b()之前完成,以便在d()中等待val1,而不是在c()中等待它。换句话说,给定一个(可能很复杂的)执行图,我必须知道哪些函数在其他函数之前完成,以便放置"等待"。声明在正确的地方。如果我在两个地方等待同一个协程,就会得到一个异常。

我的问题如下:在asyncio(或其他python模块)中是否有一种机制,自动等待协程,只是当它们需要被解析为实际值时?我知道这样的机制是在其他并行执行机制中实现的。

有很多种方法。一种可能性是使用同步原语,比如asyncio.Event。例如:

import time
import asyncio

val1 = None
val2 = None
event_a = None
event_b = None

async def a():
global val1
await asyncio.sleep(1)  # some computation
val1 = 1
event_a.set()

async def b():
global val2
await asyncio.sleep(3)
val2 = 100
event_b.set()

async def c():
await event_a.wait()
await event_b.wait()
await asyncio.sleep(1)
return val1 + val2

async def d():
await event_a.wait()
await asyncio.sleep(1)
return val1 * 2

async def f():
global event_a
global event_b
event_a = asyncio.Event()
event_b = asyncio.Event()
out = await asyncio.gather(a(), b(), c(), d())
assert out[2] == 101
assert out[3] == 2

async def main():
t1 = time.time()
await f()
t2 = time.time()
print(t2 - t1)

if __name__ == "__main__":
asyncio.run(main())

打印:

4.0029356479644775

另一个选择是将计算拆分为更多协程,例如:

import time
import asyncio

async def a():
await asyncio.sleep(1)  # some computation
return 1

async def b():
await asyncio.sleep(3)
return 100

async def c(val1, val2):
await asyncio.sleep(1)
return val1 + val2

async def d(val1):
await asyncio.sleep(1)
return val1 * 2

async def f():
async def task1():
params = await asyncio.gather(a(), b())  # <-- run a() and b() in parallel
return await c(*params)
async def task2():
return await d(await a())
out = await asyncio.gather(task1(), task2())
assert out[0] == 101
assert out[1] == 2

async def main():
t1 = time.time()
await f()
t2 = time.time()
print(t2 - t1)

if __name__ == "__main__":
asyncio.run(main())

打印:

4.00294041633606

最新更新