在 Python 中建立多个 websocket 客户端连接的最佳方法



我很欣赏我要问的问题相当广泛,但是,作为Python的新手,我正在努力寻找[最佳]方法来做一些事情,比如说,Node.js,在其他环境(如C#)中是微不足道的。

假设有一个装满东西的仓库。假设该仓库上有一个具有两个特征的 websocket 接口:在客户端连接上,它会抽出仓库当前库存的完整列表,然后在库存更改时进行进一步的流式更新。

网络上充斥着如何在Python中连接到仓库并响应其状态变化的示例。但。。。

如果我想连接到两个仓库并根据从每个仓库分别检索到的组合信息执行某些操作,该怎么办?如果我想根据时间等因素来做事情,而不是仅仅由库存更改和传入的 websocket 消息驱动,该怎么办?

在我看到的所有例子中 - 它开始感觉像数百个 - 在某个地方,以某种形式,一个run()或一个run_forever()或一个run_until_complete()等。换句话说,I/O 可能是异步的,但代码中总是存在大量的阻塞操作,并且总是有两个不适合我的情况的基本假设:只有一个 websocket 连接,并且所有处理都将由 [单个] websocket 服务器发送的事件驱动。

我不清楚我的问题的答案是某种使用多个事件循环,还是多个线程,或者其他什么。

迄今为止,尝试使用Python感觉就像在顶层公寓,欣赏古怪但不可否认的优雅装饰。但是当你进入电梯,按下标有"并行"或"并发"的按钮,加速器进入自由落体状态,最终把你放在一个地下室里,里面装满了一些非常丑陋和冒着蒸汽的管道。

。从华丽的隐喻回到技术上,我正在努力解决的关键是Python等价物,比如Node.js代码,它可以像以下示例一样简单[为简单起见不优雅]:

var aggregateState = { ... some sort of representation of combined state ... };
var socket1 = new WebSocket("wss://warehouse1");
socket1.on("message", OnUpdateFromWarehouse);
var socket2 = new WebSocket("wss://warehouse2");
socket2.on("message", OnUpdateFromWarehouse);
function OnUpdateFromWarehouse(message)
{
... Take the information and use it to update aggregate state from both warehouses ...
}

回答我自己的问题,希望它可以帮助其他 Python 新手......asyncio似乎是要走的路(尽管有一些陷阱,例如您可以死锁事件循环的惊人轻松)。

假设使用一个异步友好的 websocket 模块,例如 websockets,似乎有效的是一个遵循以下行的框架 - 为了简单起见,剪掉了诸如重新连接之类的逻辑。(前提仍然是一个仓库,它发送其完整库存的初始列表,然后将更新发送到该初始状态。

class Warehouse:
def __init__(self, warehouse_url):
self.warehouse_url = warehouse_url
self.inventory = {}  # Some description of the warehouse's inventory

async def destroy():
if (self.websocket.open):
self.websocket.close()  # Terminates any recv() in wait_for_incoming() 
await self.incoming_message_task  # keep asyncio happy by awaiting the "background" task
async def start(self):
try:
# Connect to the warehouse
self.websocket = await connect(self.warehouse_url)          
# Get its initial message which describes its full state
initial_inventory = await self.websocket.recv()
# Store the initial inventory
process_initial_inventory(initial_inventory)
# Set up a "background" task for further streaming reads of the web socket
self.incoming_message_task = asyncio.create_task(self.wait_for_incoming())
# Done
return True
except:
# Connection failed (or some unexpected error)
return False
async def wait_for_incoming(self):
while self.websocket.open:
try:
update_message = await self.websocket.recv()
asyncio.create_task(self.process_update_message(update_message))
except:
# Presumably, socket closure
pass
def process_initial_inventory(self, initial_inventory_message):
... Process initial_inventory_message into self.inventory ...

async def process_update_message(self, update_message):
... Merge update_message into self.inventory ...
... And fire some sort of event so that the object's 
... creator can detect the change. There seems to be no ...
... consensus about what is a pythonic way of implementing events, ... 
... so I'll declare that - potentially trivial - element as out-of-scope ...

完成初始连接逻辑后,一个关键的事情是设置一个"后台"任务,该任务重复读取通过 websocket 传入的进一步更新消息。上面的代码不包括任何事件触发,但是process_update_message()可以通过多种方式/可以做到这一点(其中许多非常简单),允许对象的创建者随时随地以它认为合适的方式处理通知。只要对象的创建者继续很好地使用 asyncio 并参与协作式多任务处理,流消息将继续被接收,任何事件都将继续被触发。

完成此操作后,可以按照以下思路建立连接:

async def main():
warehouse1 = Warehouse("wss://warehouse1")
if await warehouse1.start():
... Connection succeeded. Update messages will now be processed 
in the "background" provided that other users of the event loop 
yield in some way ...
else:
... Connection failed ...
asyncio.run(main())

可以通过多种方式启动多个仓库,包括对每个仓库进行create_task(warehouse.start()),然后对任务进行gather以确保/检查它们是否都正常。

当需要退出时,为了让 asyncio 开心,停止它抱怨孤立的任务,并允许一切顺利关闭,有必要在每个仓库上打电话给destroy()

但是有一个共同的元素,这没有涵盖。扩展上面的原始前提,假设仓库也接受来自我们的 websocket 客户端的请求,例如"将 X 运送到 Y"。对这些请求的成功/失败响应将与常规更新消息一起出现;通常无法保证请求的 send() 之后的第一个 recv() 将是对该请求的响应。这使process_update_message()复杂化。

我找到的最佳答案可能被认为是也可能不是"pythonic",因为它使用Future的方式与.NET中的TaskCompletionSource非常相似。

让我们发明几个实现细节;任何现实世界的场景都可能看起来像这样:

  • 我们可以在向仓库提交指令时提供request_id
  • 来自仓库的成功/失败响应将request_id重复给我们(因此也区分了命令响应消息与库存更新消息)

第一步是拥有一个字典,该字典将挂起的、正在进行的请求的 ID 映射到Future对象:

def __init__(self, warehouse_url):
...
self.pending_requests = {}

发送请求的协程的定义如下所示:

async def send_request(self, some_request_definition)
# Allocate a unique ID for the  request
request_id = <some unique request id>
# Create a Future for the pending request
request_future = asyncio.Future()
# Store the map of the ID -> Future in the dictionary of pending requests
self.pending_requests[request_id] = request_future
# Build a request message to send to the server, somehow including the request_id
request_msg = <some request definition, including the request_id>
# Send the message 
await self.websocket.send(request_msg) 
# Wait for the future to complete - we're now asynchronously awaiting
# activity in a separate function
await asyncio.wait_for(command_future, timeout = None)
# Return the result of the Future as the return value of send_request()
return request_future.result()

调用方可以使用如下所示的内容创建请求并等待其异步响应:

some_result = await warehouse.send_request(<some request def>)

使这一切发挥作用的关键是修改和扩展process_update_message()以执行以下操作:

  • 区分请求响应与清单更新
  • 对于前者,提取请求 ID(我们发明的场景说它会重复给我们)
  • 查找请求的挂起Future
  • 对它执行set_result()(其值可以是任何值,具体取决于服务器的响应)。这会释放send_request()并导致解析来自它的等待。

例如:

async def process_update_message(self, update_message):
if <some test that update_message is a request response>:
request_id = <extract the request ID repeated back in update_message>
# Get the Future for this request ID
request_future = self.pending_requests[request_id]
# Create some sort of return value for send_request() based on the response
return_value = <some result of the request>
# Complete the Future, causing send_request() to return
request_future.set_result(return_value)
else:
... handle inventory updates as before ...

我没有使用带有 asyncio 的套接字,但您可能只是在寻找 asyncio 的open_connection

async def socket_activity(address, callback):
reader, _ = await asyncio.open_connection(address)
while True:
message = await reader.read()
if not message:  # empty bytes on EOF
break  # connection was closed
await callback(message)

然后将这些添加到事件循环中

tasks = []  # keeping a reference prevents these from being garbage collected
for address in ["wss://warehouse1", "wss://warehouse2"]:
tasks.append(asyncio.create_task(
socket_activity(address, callback)
))
# return tasks  # or work with them

如果要在协程中等待 N 个操作完成,可以使用.gather()

或者,您可能会发现龙卷风可以做您想要的一切以及更多(我基于我的答案)
Tornado websocket 客户端:如何异步on_message?(从未等待过协程)

最新更新