在 python 中测试 websocket 可用性的简单方法



我使用以下代码来测试本地 websocket 服务器是否正在运行:

import asyncio
import websockets
async def hello():
async with websockets.connect('ws://127.0.0.1:8000/ws/registration/123') as websocket:
await websocket.send(json_data)
asyncio.get_event_loop().run_until_complete(hello())

有没有更简单的方法可以在不使用 asyncio 的情况下做到这一点?例如:

import asyncio
import websockets
conn = websockets.connect('ws://127.0.0.1:8000/ws/registration/123')
conn.send('hello')

基本上,我只是想找到最简单的方法来测试我的 websocket 服务器是否正在侦听和接收特定 url 的消息。

async_to_sync不会使这变得更加复杂吗?为什么不直接创建一个普通的test_url函数:

def test_url(url, data=""):
async def inner():
async with websockets.connect(url) as websocket:
await websocket.send(data)
return asyncio.get_event_loop().run_until_complete(inner())
test_url("ws://127.0.0.1:8000/ws/registration/123")

您可以使用async_to_sync来执行上述操作,例如:

from asgiref.sync import async_to_sync
import websockets
def test_url(url, data=""):
conn = async_to_sync(websockets.connect)(url)
async_to_sync(conn.send)(data)

test_url("ws://127.0.0.1:8000/ws/registration/123")

请注意,"握手"可能不会在这里完成,因为它需要双向接受,但上述内容应该使您能够进行测试以确保 url 被正确路由等。

如果需要禁用证书验证,可以执行以下操作:

#!/usr/bin/env python3 

import asyncio
import ssl
import websockets 
def test_url(url, data=""):
async def inner():
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
async with websockets.connect(url, ssl=ctx) as websocket:
await websocket.send(data)
return asyncio.get_event_loop().run_until_complete(inner())
test_url("ws://127.0.0.1:8000/ws/registration/123")

最新更新