我使用以下代码来测试本地 websocket 服务器是否正在运行:
import asyncio
import websockets
async def hello():
async with websockets.connect('ws://127.0.0.1:8000/ws/registration/123') as websocket:
await websocket.send(json_data)
asyncio.get_event_loop().run_until_complete(hello())
有没有更简单的方法可以在不使用 asyncio 的情况下做到这一点?例如:
import asyncio
import websockets
conn = websockets.connect('ws://127.0.0.1:8000/ws/registration/123')
conn.send('hello')
基本上,我只是想找到最简单的方法来测试我的 websocket 服务器是否正在侦听和接收特定 url 的消息。
async_to_sync
不会使这变得更加复杂吗?为什么不直接创建一个普通的test_url
函数:
def test_url(url, data=""):
async def inner():
async with websockets.connect(url) as websocket:
await websocket.send(data)
return asyncio.get_event_loop().run_until_complete(inner())
test_url("ws://127.0.0.1:8000/ws/registration/123")
您可以使用async_to_sync
来执行上述操作,例如:
from asgiref.sync import async_to_sync
import websockets
def test_url(url, data=""):
conn = async_to_sync(websockets.connect)(url)
async_to_sync(conn.send)(data)
test_url("ws://127.0.0.1:8000/ws/registration/123")
请注意,"握手"可能不会在这里完成,因为它需要双向接受,但上述内容应该使您能够进行测试以确保 url 被正确路由等。
如果需要禁用证书验证,可以执行以下操作:
#!/usr/bin/env python3
import asyncio
import ssl
import websockets
def test_url(url, data=""):
async def inner():
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
async with websockets.connect(url, ssl=ctx) as websocket:
await websocket.send(data)
return asyncio.get_event_loop().run_until_complete(inner())
test_url("ws://127.0.0.1:8000/ws/registration/123")