Asyncio Streams自动重新启动读取数据,即使在writer.close()运行之后 &g



我按照https://docs.python.org/3/library/asyncio-stream.html中的教程从传感器(TCP协议,PC作为服务器)读取数据:

import datetime
import asyncio
import socket
async def connect_to_sq():
_ip = "0.0.0.0"
_port = 45454
# Create a TCP server (socket type: SOCK_STREAM)
server = await asyncio.start_server(handle_server, _ip, _port,family=socket.AF_INET, reuse_address=True)
async with server: 
await server.serve_forever()  
async def handle_server(reader, writer):
# read data
print("start reading")
init_time = datetime.datetime.now()
end_time = init_time + datetime.timedelta(seconds=1)
while datetime.datetime.now() < end_time:
raw_data_stream = await reader.readexactly(200)
print("time:", datetime.datetime.now()-init_time)
# close connection
writer.close()
await writer.wait_closed()
print("Connection closed")
if __name__ == "__main__":
asyncio.run(connect_to_sq())

程序应该在1秒数据传输后结束。但是,输出是:

start reading
time: 0:00:00.495863
time: 0:00:00.594812
time: 0:00:00.695760
time: 0:00:00.794883
time: 0:00:00.895336
time: 0:00:00.995024
time: 0:00:01.095308
Connection closed
start reading
time: 0:00:00.647908
time: 0:00:00.750355
time: 0:00:00.848436
......

它自动地无限地重复。这是什么原因,我该如何解决它?

问题

serve_forever()无限制地侦听端口。因此,即使单个连接在1秒后关闭,服务器也会继续接受新的连接。在这种情况下,您的传感器(客户端)似乎在旧连接关闭后创建了一个新连接,并且由于服务器仍然接受它们,因此handle_server再次从顶部运行。

<标题>

解决方案也许不是最好的方法™,但一个可能的解决方案是使用Future,以便handle_server可以在连接完成时向主代码发出信号。然后,主代码可以停止服务器,避免新的连接。可以这样做:

import datetime
import asyncio
import socket
from functools import partial
async def connect_to_sq():
_ip = "0.0.0.0"
_port = 45454
# Create a TCP server (socket type: SOCK_STREAM)
first_connection_completion = asyncio.Future()
server = await asyncio.start_server(
partial(handle_server, first_connection_completion=first_connection_completion),
_ip,
_port,family=socket.AF_INET,
reuse_address=True)
async with server: 
server_task = asyncio.create_task(server.serve_forever())
await first_connection_completion
server_task.cancel()
async def handle_server(reader, writer, first_connection_completion=None):
# read data
print("start reading")
init_time = datetime.datetime.now()
end_time = init_time + datetime.timedelta(seconds=1)
while datetime.datetime.now() < end_time:
raw_data_stream = await reader.readexactly(200)
print("time:", datetime.datetime.now()-init_time)
# close connection
writer.close()
await writer.wait_closed()
print("Connection closed")
first_connection_completion.set_result(None)
if __name__ == "__main__":
asyncio.run(connect_to_sq())

注意事项:

  • handle_server现在有一个额外的参数,first_connection_completion,这是将来用于从函数发送信号到主代码。参数已经使用functools.partial绑定到函数。在函数中,set_result被用来标记未来完成。

  • serve_forever()现在被create_task调用包装。这是因为我们不能对它进行await

新的代码看起来很乱,但是你总是可以重构的。所以更简洁的版本应该是:

import datetime
import asyncio
import socket
async def listen_once(handler, *server_args, **server_kwargs):
first_connection_completion = asyncio.Future()
async def wrapped_handler(*args):
await handler(*args)
first_connection_completion.set_result(None)
server = await asyncio.start_server(wrapped_handler, *server_args, **server_kwargs)
async with server:
server_task = asyncio.create_task(server.serve_forever())
await first_connection_completion
server_task.cancel()
async def connect_to_sq():
_ip = "0.0.0.0"
_port = 45454
# Create a TCP server (socket type: SOCK_STREAM)
await listen_once(handle_server, _ip, _port, family=socket.AF_INET, reuse_address=True)
async def handle_server(reader, writer):
# read data
print("start reading")
init_time = datetime.datetime.now()
end_time = init_time + datetime.timedelta(seconds=1)
while datetime.datetime.now() < end_time:
raw_data_stream = await reader.readexactly(200)
print("time:", datetime.datetime.now()-init_time)
# close connection
writer.close()
await writer.wait_closed()
print("Connection closed")
if __name__ == "__main__":
asyncio.run(connect_to_sq())

最新更新