是否使用异步流在TCP服务器上保持无限连接打开



我正在努力了解如何对多个连接使用异步流,这些连接将一直发送消息,直到达到预定义条件或套接字超时。查看Python文档,它们为基于异步流的TCP服务器提供了以下示例:

import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())

我试图做的事情更复杂,看起来更像这样(很多都是伪代码,用大写字母写或省略了实现(:

import asyncio
async def io_control(queue):
while true:
...
# do I/O control in this function ... 
async def data_processing(queue):
while true:
...
# perform data handling
async def handle_data(reader, writer):
data = await reader.read()
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")

#do stuff with a queue - pass messages to other two async functions as needed        
#keep open until something happens
if(ERROR or SOCKET_TIMEOUT):
writer.close()
async def server(queue):
server = await asyncio.start_server(
handle_data, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
async def main():
queue_io = asyncio.Queue()
queue_data = asyncio.Queue()

asyncio.run(server(queue_data))
asyncio.run(data_handling(queue_data))
asyncio.run(io_control(queue_io))
asyncio.run(main())

这看起来可行吗?我不习惯使用协同例程(我更多地来自多线程范式(,所以我不确定我所做的是否正确,或者我是否必须明确地包括收益或做任何额外的事情。

如果我理解正确,您只需要TCP服务器能够处理多个并发连接。start_server函数应该已经为您提供了所需的一切。

第一个参数client_connected_cb是每当客户端建立连接时调用的协程函数。如果在该函数中引入一个循环(在示例代码handle_data中(,则可以保持连接打开,直到满足某些条件。到底是什么条件应该导致关闭连接取决于您,实现细节显然取决于此。我能想到的最简单的方法是这样的:

import asyncio
import logging
log = logging.getLogger(__name__)
async def handle_data(reader, writer):
while True:
data = (await reader.readline()).decode().strip()
if not data:
log.debug("client disconnected")
break
response = await your_data_processing_function(data)
writer.write(response.encode())
await writer.drain()
...
async def main():
server = await asyncio.start_server(handle_data, '127.0.0.1', 8888)
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())

理论上,并发连接的数量没有限制。

如果client_connected_cb是一个协程函数,那么每个新连接都会为事件循环安排一个新任务。这就是并发性的来源。魔术发生在await从客户端接收新数据的时候;这就是事件循环可以将执行切换到另一个协同程序的地方。可以说,这一切都发生在幕后。

如果要引入超时,可以将不可用的readline协程封装在wait_for中,然后捕获退出循环的TimeoutError

希望这能有所帮助。

最新更新