异步管理连接的客户端,同时用Python连续发送数据



我有一个用Python构建的服务器,它使用Sanic和websocket定期向客户端广播数据:

@app.websocket("/")
async def websocket(request, ws):
app.ws_clients.add(ws)
await ws.send(json.dumps("hello from climate server!"))
while True:
try:
data = dict()
time_of_reading = time.ctime(time.time())
data['climateData'] = sensor.read_data()
data['systemData'] = get_system_data()
data['timestamp'] = time_of_reading
await broadcast(json.dumps(data))
time.sleep(10) # changing this to asyncio.sleep() causes the msgs to send sporatically
except KeyboardInterrupt:
sensor.clear()
pass
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, workers=1, debug=False)

以及我的广播功能,它试图发送消息,或者在出现ConnectionClosed错误时从app.ws_clients中删除客户端:

async def broadcast(message):
for ws in app.ws_clients:
try:
await ws.send(message)
print('attempting data send') # this line runs, but the clients don't receive the messages
except websockets.ConnectionClosed:
clients_to_remove.add(ws)
except KeyboardInterrupt:
sensor.clear()
pass
if (len(clients_to_remove) > 0):
await remove_dead_clients(clients_to_remove)
async def remove_dead_clients(clients_to_remove):
for client in clients_to_remove:
app.ws_clients.remove(client)
clients_to_remove.clear()

客户端能够很好地连接,服务器打印出它正在尝试广播,但客户端从未接收到任何消息。

我正在使用我编写的另一台服务器上的广播功能,它在那里运行得很好。与之不同的是,它只在客户端请求时发送数据。我觉得这里的问题是异步无法同时处理连续广播和删除客户端。我尝试将time.sleep()更改为asyncio.sleep(),但这只成功地一次发送了几十条消息,然后在一段时间内什么都没有。

有没有一种模式可以满足我的需求,我可以在无休止的循环中发送消息,还可以异步管理连接的客户端?

对于将来偶然发现这一点的人,我忘记在asyncio.sleep()之前使用await关键字。

最新更新