如何在异步服务器中实现超时



下面是一个简单的回显服务器。但是,如果客户端在 10 秒内没有发送任何内容,我想关闭连接。

import asyncio

async def process(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    print("awaiting for data")
    line = await reader.readline()
    print(f"received {line}")
    writer.write(line)
    print(f"sent {line}")
    await writer.drain()
    print(f"Drained")

async def timeout(task: asyncio.Task, duration):
    print("timeout started")
    await asyncio.sleep(duration)
    print("client unresponsive, cancelling")
    task.cancel()
    print("task cancelled")

async def new_session(reader, writer):
    print("new session started")
    task = asyncio.create_task(process(reader, writer))
    timer = asyncio.create_task(timeout(task, 10))
    await task
    print("task complete")
    timer.cancel()
    print("timer cancelled")
    writer.close()
    print("writer closed")

async def a_main():
    server = await asyncio.start_server(new_session, port=8088)
    await server.serve_forever()

if __name__ == '__main__':
    asyncio.run(a_main())

如果客户端发送消息,则工作正常。但另一种情况,当客户端保持沉默时,它不起作用

当客户端发送消息时:

new session started
awaiting for data
timeout started
received b'slkdfjsdlkfjrn'
sent b'slkdfjsdlkfjrn'
Drained
task complete
timer cancelled
writer closed

当客户端在打开连接后保持沉默时

new session started
awaiting for data
timeout started
client unresponsive, cancelling
task cancelled

没有task completetimer cancelledwriter closed

  1. 上面的代码有什么问题?
  2. 有没有更好的方法来实现超时?

更新

找出问题所在,看起来任务实际上被取消了,但异常被默默忽略了,通过捕获CancelledError修复了问题

async def new_session(reader, writer):
    print("new session started")
    task = asyncio.create_task(process(reader, writer))
    timer = asyncio.create_task(timeout(task, 10))
    try:
        await task
    except asyncio.CancelledError:
        print(f"Task took too long and was cancelled by timer")
    print("task complete")
    timer.cancel()
    print("timer cancelled")
    writer.close()
    print("writer closed")

第二部分仍然存在。有没有更好的方法来实现超时?


更新2

使用 wait_for 完成代码。不再需要超时代码。检查下面接受的解决方案:

async def new_session(reader, writer):
    print("new session started")
    try:
        await asyncio.wait_for(process(reader, writer), timeout=5)
    except asyncio.TimeoutError as te:
        print(f'time is up!{te}')
    finally:
        writer.close()
        print("writer closed")

我在建立连接时使用以下代码。我建议对代码使用类似的wait_for。

fut = asyncio.open_connection( self.host, self.port, loop=self.loop )
try:
   r, w = await asyncio.wait_for(fut, timeout=self.connection_timeout)
except asyncio.TimeoutError:
   pass

有没有更好的方法来实现超时?

您可以使用asyncio.wait_for而不是timeout。它具有类似的语义,但已经带有 asyncio。此外,您可以等待将来它返回以检测是否已发生超时。

最新更新