Asyncio任务被意外延迟



我一直在努力学习一些关于asyncio,我有一些意想不到的行为。我已经设置了一个简单的fibonacci服务器,它支持使用流的多个连接。fib计算是递归编写的,所以我可以通过输入一个大的数字来模拟长时间运行的计算。正如预期的那样,长时间运行的计算会阻塞I/O,直到长时间运行的计算完成。

问题来了。我把斐波那契函数重写成了协程。我期望通过每个递归的产生,控制将回落到事件循环,等待I/O任务将有机会执行,您甚至可以并发地运行多个fib计算。然而,情况似乎并非如此。

代码如下:

import asyncio
@asyncio.coroutine
def fib(n):
    if n < 1:
        return 1
    a = yield from fib(n-1)
    b = yield from fib(n-2)
    return a + b

@asyncio.coroutine
def fib_handler(reader, writer):
    print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
    while True:
        req = yield from reader.readline()
        if not req:
            break
        print(req)
        n = int(req)
        result = yield from fib(n)
        writer.write('{}n'.format(result).encode('ascii'))
        yield from writer.drain()
    writer.close()
    print("Closed")

def server(address):
    loop = asyncio.get_event_loop()
    fib_server = asyncio.start_server(fib_handler, *address, loop=loop)
    fib_server = loop.run_until_complete(fib_server)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print('closing...')
        fib_server.close()
        loop.run_until_complete(fib_server.wait_closed())
        loop.close()

server(('', 25000))

如果您netcat到端口25000并开始输入数字,此服务器运行得非常好。但是,如果开始长时间运行的计算(例如35),则在第一个计算完成之前不会运行其他计算。事实上,额外的连接甚至不会被处理。

我知道事件循环正在反馈递归fib调用的收益,因此控制必须一直下降。但我认为循环将处理I/O队列中的其他调用(例如生成第二个fib_handler),然后再"蹦床"回到fib函数。

我敢肯定我一定是误解了什么,或者是我忽视了某种bug,但我怎么也找不到它。

第一个问题是您在fib_handler内部调用yield from fib(n)。包括yield from意味着fib_handler将阻塞,直到对fib(n)的调用完成,这意味着在fib运行时它无法处理您提供的任何输入。即使您所做的只是在fib内部进行I/O操作,也会遇到这个问题。为了解决这个问题,你应该使用asyncio.async(fib(n))(或者最好是asyncio.ensure_future(fib(n)),如果你有一个新的Python版本)来调度fib与事件循环,而不实际阻塞fib_handler。从那里,您可以使用Future.add_done_callback将结果写入客户端,当它准备好了:

import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def fib(n):
    if n < 1:
        return 1
    a = yield from fib(n-1)
    b = yield from fib(n-2)
    return a + b
def do_it(writer, result):
    writer.write('{}n'.format(result.result()).encode('ascii'))
    asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
    print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
    executor = ProcessPoolExecutor(4)
    loop = asyncio.get_event_loop()
    while True:
        req = yield from reader.readline()
        if not req:
            break
        print(req)
        n = int(req)
        result = asyncio.async(fib(n))
        # Write the result to the client when fib(n) is done.
        result.add_done_callback(partial(do_it, writer))
    writer.close()
    print("Closed")

也就是说,这个变化本身仍然不能完全解决问题;虽然它允许多个客户端同时连接和发出命令,但单个客户端仍将获得同步行为。这是因为当您在协程函数上直接调用yield from coro()时,直到coro()(或由coro调用的另一个协程)实际执行了一些非阻塞I/O之后,控制权才交还给事件循环。否则,Python将只执行coro而不放弃控制。这是一个有用的性能优化,因为当协程实际上不打算阻塞I/O时,将控制权交给事件循环是浪费时间,特别是考虑到Python的高函数调用开销。

在你的例子中,fib从不做任何I/O,所以一旦你在fib内部调用yield from fib(n-1),事件循环就不会再次运行,直到它完成递归,这将阻止fib_handler从客户端读取任何后续输入,直到对fib的调用完成。将所有fib的调用包装在asyncio.async中,保证每次调用yield from asyncio.async(fib(...))时都将控制权交给事件循环。当我进行此更改时,除了在fib_handler中使用asyncio.async(fib(n))之外,我还能够并发地处理来自单个客户机的多个输入。下面是完整的示例代码:

import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def fib(n):
    if n < 1:
        return 1
    a = yield from fib(n-1)
    b = yield from fib(n-2)
    return a + b
def do_it(writer, result):
    writer.write('{}n'.format(result.result()).encode('ascii'))
    asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
    print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
    executor = ProcessPoolExecutor(4)
    loop = asyncio.get_event_loop()
    while True:
        req = yield from reader.readline()
        if not req:
            break
        print(req)
        n = int(req)
        result = asyncio.async(fib(n))
        result.add_done_callback(partial(do_it, writer))
    writer.close()
    print("Closed")

客户端输入/输出:

dan@dandesk:~$ netcat localhost 25000
35 # This was input
4  # This was input
8  # output
24157817 # output

现在,即使这个工作,我也不会使用这个实现,因为它在单线程程序中做了一堆cpu绑定的工作,也想在同一个线程中提供I/O服务。这不会很好地扩展,也不会有理想的性能。相反,我建议使用loop.run_in_executor在后台进程中运行对fib的调用,这允许asyncio线程满负荷运行,并且还允许我们跨多个核心扩展对fib的调用:

import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
def fib(n):
    if n < 1:
        return 1
    a = fib(n-1)
    b = fib(n-2)
    return a + b
def do_it(writer, result):
    writer.write('{}n'.format(result.result()).encode('ascii'))
    asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
    print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
    executor = ProcessPoolExecutor(8)  # 8 Processes in the pool
    loop = asyncio.get_event_loop()
    while True:
        req = yield from reader.readline()
        if not req:
            break
        print(req)
        n = int(req)
        result = loop.run_in_executor(executor, fib, n)
        result.add_done_callback(partial(do_it, writer))
    writer.close()
    print("Closed")

最新更新