具有map&reduce风格的异步,并且不会淹没事件循环



我试图在实际应用程序中使用asyncio,但它不会那样做很简单,我们非常需要asyncio大师的帮助。

在没有泛滥事件循环的情况下生成其他任务的任务(成功!)

考虑这样一个任务:从一些"种子"网页开始爬行网络。每一个网页导致生成新的下载任务指数(!)进展。然而,我们既不想让事件循环泛滥,也不想超负荷我们的网络。我们想要控制任务流程。这就是我通过修改尼斯马克西姆的解决方案,可以很好地实现:https://mail.python.org/pipermail/python-list/2014-July/687823.html

地图,减少(失败)

我还需要一个很自然的东西,map() &reduce ()或者functools.reduce()如果我们已经在python3上。也就是说,我需要调用"汇总"函数来记录所有已完成的下载任务来自网页的链接。这就是我失败的地方:(

我建议用一个过于简化但仍然很好的测试来建模用例:让我们使用无效形式的斐波那契函数实现。也就是说,让coro_sum()应用于reduce(),而coro_fib是我们应用的对象map()。像这样:

@asyncio.coroutine
def coro_sum(x):
    return sum(x)
@asyncio.coroutine
def coro_fib(x):
    if x < 2:
        return 1
    res_coro =
executor_pool.spawn_task_when_arg_list_of_coros_ready(coro=coro_sum,
 arg_coro_list=[coro_fib(x - 1), coro_fib(x - 2)])
    return res_coro

这样我们可以运行以下测试

测试#1对一个工人:

executor_pool = ExecutorPool(workers=1)
executor_pool.as_completed( coro_fib(x) for x in range(20) )

测试#2对两个工人:

executor_pool = ExecutorPool(workers=2)
executor_pool.as_completed( coro_fib(x) for x in range(20) )

coro_fib()和coro_sum()是非常重要的调用是通过某个worker上的Task完成的,而不仅仅是隐式生成的和非托管!

发现asyncio大师对这个非常自然的目标感兴趣,这将是很酷的。非常感谢您的帮助和建议。

问好

Valery

有多种异步计算斐波那契数列的方法。首先,检查爆炸变量在您的情况下是否失败:

@asyncio.coroutine
def coro_sum(summands):
    return sum(summands)
@asyncio.coroutine
def coro_fib(n):
    if   n == 0: s = 0
    elif n == 1: s = 1
    else:
        summands, _ = yield from asyncio.wait([coro_fib(n-2), coro_fib(n-1)])
        s = yield from coro_sum(f.result() for f in summands)
    return s

您可以将summands替换为:

a = yield from coro_fib(n-2) # don't return until its ready
b = yield from coro_fib(n-1)
s = yield from coro_sum([a, b])

一般来说,为了防止指数增长,您可以使用asyncio.Queue(通过通信同步),asyncio.Semaphore(使用互斥锁同步)原语。

最新更新