使用asyncio从celery worker收集结果



我有一个Python应用程序,它将一些处理工作卸载到一组芹菜工人。然后,主应用程序必须等待这些工人的结果。当一个worker的结果可用时,主应用程序将处理结果,并将调度更多的worker来执行。

我希望主应用程序以非阻塞方式运行。到目前为止,我有一个轮询函数来查看是否有任何工人的结果可用。

我正在考虑使用asyncio获取关于结果可用性的通知的可能性,以便我可以避免轮询。但是,我找不到任何关于如何做到这一点的信息。

任何关于这方面的建议都将非常感谢。

PS:我知道用gevent,我可以避免轮询。但是,我使用的是python3.4,因此更愿意避免使用gevent而使用asyncio。

您一定在寻找asyncio.as_completed(coros)。当不同协程的结果准备好时,它会产生结果。它返回一个迭代器,按照它们完成的顺序生成-。您可能还想看看它与asyncio.gather(*coros)有何不同,后者在提交给它的所有内容都完成后返回

import asyncio
from asyncio.coroutines import coroutine

@coroutine
def some_work(x, y):
    print("doing some background work")
    yield from asyncio.sleep(1.0)
    return x * y

@coroutine
def some_other_work(x, y):
    print("doing some background other work")
    yield from asyncio.sleep(3.0)
    return x + y

@coroutine
def as_when_completed():
    # give me results as and when they are ready
    coros = [some_work(2, 3), some_other_work(2, 3)]
    for futures in asyncio.as_completed(coros):
        res = yield from futures
        print(res)

@coroutine
def when_all_completed():
    # when everything is complete
    coros = [some_work(2, 3), some_other_work(2, 3)]
    results = yield from asyncio.gather(*coros)
    print(results)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # loop.run_until_complete(when_all_completed())
    loop.run_until_complete(as_when_completed())

实现celery workeron_finish函数向redis发布消息

然后在主应用程序中使用aioredis订阅通道,一旦得到通知,结果就准备好了

相关内容

  • 没有找到相关文章

最新更新