我有一个Python应用程序,它将一些处理工作卸载到一组芹菜工人。然后,主应用程序必须等待这些工人的结果。当一个worker的结果可用时,主应用程序将处理结果,并将调度更多的worker来执行。
我希望主应用程序以非阻塞方式运行。到目前为止,我有一个轮询函数来查看是否有任何工人的结果可用。
我正在考虑使用asyncio获取关于结果可用性的通知的可能性,以便我可以避免轮询。但是,我找不到任何关于如何做到这一点的信息。
任何关于这方面的建议都将非常感谢。
PS:我知道用gevent,我可以避免轮询。但是,我使用的是python3.4,因此更愿意避免使用gevent而使用asyncio。
您一定在寻找asyncio.as_completed(coros)
。当不同协程的结果准备好时,它会产生结果。它返回一个迭代器,按照它们完成的顺序生成-。您可能还想看看它与asyncio.gather(*coros)
有何不同,后者在提交给它的所有内容都完成后返回
import asyncio
from asyncio.coroutines import coroutine
@coroutine
def some_work(x, y):
print("doing some background work")
yield from asyncio.sleep(1.0)
return x * y
@coroutine
def some_other_work(x, y):
print("doing some background other work")
yield from asyncio.sleep(3.0)
return x + y
@coroutine
def as_when_completed():
# give me results as and when they are ready
coros = [some_work(2, 3), some_other_work(2, 3)]
for futures in asyncio.as_completed(coros):
res = yield from futures
print(res)
@coroutine
def when_all_completed():
# when everything is complete
coros = [some_work(2, 3), some_other_work(2, 3)]
results = yield from asyncio.gather(*coros)
print(results)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# loop.run_until_complete(when_all_completed())
loop.run_until_complete(as_when_completed())
实现celery worker
的on_finish
函数向redis
发布消息
然后在主应用程序中使用aioredis
订阅通道,一旦得到通知,结果就准备好了