asyncio - 如何在不停止事件循环的情况下停止(和重新启动)服务器?



在这种情况下,我正在使用websockets模块。 典型的服务器实现是这样的:

import websockets
start_server = websockets.serve(counter, "localhost", 6789)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()  

但是,我希望能够在不停止事件循环的情况下停止和重新启动服务器。 从我看到的关于 asyncio 服务器的最少文档中,我不清楚如何做到这一点。 我也不知道 websockets 是否以完全相同的方式实现。

例如,如果我做这样的事情:

def counter():
start_server = websockets.serve(connection_handler, 'localhost', 6789)
this = loop.run_until_complete(start_server)
try:
loop.run_forever()
finally:
this.close()
loop.run_until_complete(this.wait_closed())  

loop = asyncio.get_event_loop()
loop.create_task(anothertask)
startcounter = counter()

我可以通过调用 loop.stop(( 来触发服务器停止。 如何在不停止循环的情况下停止服务器(并扰乱循环上运行的另一个任务(?

您可以使用asyncio.create_task在循环运行时提交任务。run_forever()模式后跟run_until_complete()现在已被弃用,因为它与asyncio.run不兼容,而现在是运行 asyncio 代码的首选方式。

推荐的方法是在顶层使用asyncio.run来启动异步入口点(通常定义为async def main()(,然后从那里执行其余工作。 然后run_until_complete(x)变得简单await x,不需要run_forever(),因为您可以等待诸如server.serve_forever()或您选择的asyncio.Event之类的事情。

由于 websockets 服务器似乎不存在serve_forever,以下是带有事件的变体(未经测试(:

async def connection_handler(...):
...
async def test(stop_request):
# occasionally stop the server to test it
while True:
await asyncio.sleep(1)
print('requesting stop')
stop_request.set()
async def main():
stop_request = asyncio.Event()
asyncio.create_task(test(stop_request))
while True:
print('starting the server')
server = await websockets.serve(connection_handler, 'localhost', 6789)
await stop_request.wait()
stop_request.clear()
print('stopping the server')
server.close()
await server.wait_closed()
asyncio.run(main())

这个答案是基于user4815162342提供的建议和答案,以及从websockets模块文档中借用了一些东西。我回答的原因主要是提供一个更充实的例子。我认为使用 asyncio 最困难的部分之一是在不破坏它的情况下将其与其他代码段集成。

关于此实现,我想指出的一件事是 user4815162342 代码行中的"服务器"对象:

async def somefunction(): # requires async function
server = await websockets.serve(handler, 'localhost', 6789)

是使用这段代码创建的相同"服务器"对象,您将在 websockets 文档中经常看到:

def somefunction(): # non-async function
start_server = websockets.serve(handler, 'localhost', 6789)
server = asyncio.run_until_complete(start_server)

因此,它将响应方法close().

此外,websockets的作者建议服务器可以通过以下方式启动:

async def somefunction(): # requires async function
async with websockets.serve(handler, 'localhost', 6789)
await something_that_says_stop

请注意,哪些可以从非异步函数调用,哪些必须"循环"发生。

在我的示例中(有点奇特的脚本(,我集成了一个 tkinter GUI。这是通过 aiotkinter 模块完成的,除了它的 pypi 页面之外,在互联网上几乎没有任何其他提及,更不用说示例代码了。因此,下面是如何在可以启动和停止 websockets 服务器的场景中将其与 websockets 模块一起使用的示例。

请注意,我不调用 root.mainloop((。相反,这里的"阻塞行"是 asyncio.run(( 行。还要注意set_event_loop_policy线的放置位置。

这已经在mac上使用python 3.8.2和TclTk 8.6进行了测试。谢谢用户4815162342的帮助。asyncio 的语言越来越清晰,我认为我们需要获得更多使用较新语言/结构的示例代码和答案......

import sys
from functools import partial
import asyncio
import aiotkinter
from tkinter import *
from tkinter import messagebox
import json
import websockets
from netifaces import interfaces, ifaddresses, AF_INET

async def register(w, label2):
USERS.remove(w) if w in USERS else USERS.add(w)
label2.config(text=f"User Count = {len(USERS)}")

async def connection_handler(websocket, path, stop_request,
ticker, start_request, label, label2):
misterDict = {"Mr.White":"white", "Mr.Orange":"orange",
"Mr.Blonde":"khaki", "Mr.Pink":"pink",
"Mr.Blue":"blue", "Mr.Brown":"brown"}
await register(websocket, label2)
try:
message = json.dumps("welcome")
await websocket.send(message)
# ignore this -> while server.get():
async for message in websocket:
msg = json.loads(message)
if msg == "end":
return
elif msg == "stoptick":
ticker.set(False)
elif msg == "starttick":
ticker.set(True)
startTicker(ticker, label)
elif msg == "stopserver":
stop_request.set()
break
elif msg in misterDict.keys():
color = misterDict[msg]
changeColor(label, color)
# ignore this -> await asyncio.sleep(.05)
except Exception as E1:
print(f"Error! {E1}")
finally:
await register(websocket, label2)
async def main():
stop_request = asyncio.Event()
start_request = asyncio.Event()
# Some tkinter setup
root = Tk()
root.title("Color Server")
root.minsize(250, 250)
ticker = BooleanVar()
ticker.set(0)
server = BooleanVar()
server.set(0)
label = Label(master=root, height=4, width=20, bg="white",
bd=5)
label.pack()
buttonS = Checkbutton(master=root, text='Mr tick', variable=ticker,
height=3, command=partial(startTicker,
ticker, label))
buttonS.pack()
buttonW = Checkbutton(master=root, text='Websocket Server',
variable=server, height=3,
command=partial(startWeb, server,
label, stop_request, start_request))
buttonW.pack()
def on_closing():
if messagebox.askokcancel("Quit", "Do you want to quit?"):
QUITTER.add("yes")
if server.get():
stop_request.set()
else:
start_request.set()

buttonX = Button(master=root, text='Shut down everything', height=2,
command=on_closing)
buttonX.pack()
label2 = Label(master=root, text='User Count = 0',
height=2, bg="white", bd=2)
label2.pack()
root.protocol("WM_DELETE_WINDOW", on_closing)
# websocket server setup code.
serverlist = set()
iplist = [ifaddresses(face)[AF_INET][0]["addr"]
for face in interfaces() if AF_INET in ifaddresses(face)]
print(f"the interfaces found = {iplist}")
while True:
await start_request.wait()
start_request.clear()
if "yes" in QUITTER:
break
server.set(1)
for ipadd in iplist: # for each IP address in the system
# setup a websocket server
bound_handler = partial(connection_handler,
stop_request=stop_request, ticker=ticker,
start_request=start_request,
label=label, label2=label2)
sockserver = await websockets.serve(bound_handler, ipadd, 6789)
serverlist.add(sockserver)
await stop_request.wait()
stop_request.clear()
for sockserver in serverlist:
sockserver.close()
await sockserver.wait_closed()
serverlist.clear()
if "yes" in QUITTER:
break
server.set(0)

def startWeb(server, label, stop_request, start_request):
if not server.get():
stop_request.set()
return
else:
start_request.set()

def changeColor(label, color):
label.config(bg=color)

async def mRtick(ticker, label):
ticktock = [("tick", "w"), ("tock", "e")]
tocker = 0
while ticker.get():
a,b = ticktock[tocker]
label.config(text=a, anchor=b)
tocker = not tocker
await asyncio.sleep(0.5)
label.config(text="")

def startTicker(ticker, label):
if not ticker.get():
return
asyncio.create_task(mRtick(ticker, label))

QUITTER = set()
USERS = set()
asyncio.set_event_loop_policy(aiotkinter.TkinterEventLoopPolicy())
asyncio.run(main())
print("we've ended...")
sys.exit()

请注意,我发现代码中有一个不必要的 while 循环。这现在已经被注释掉了。

相关内容

  • 没有找到相关文章

最新更新