我有以下生产者-消费者架构:
- 接受连接的Websockets服务器。连接的客户端发送数据。传入的数据被放入队列
- 从队列中读取并处理传入数据的协程
问题是,我被困在客户端处理程序中了找不到更好的词了。我找不到将参数传递给客户端处理程序的方法,因此无法访问队列以向外转发client_handler
的数据到目前为止的代码
import asyncio
import websockets
# Websockets client Handler accepts data and puts it into queue
async def client_handler(websocket, path):
print(f"Connected with path '{path}'")
async for msg_rx in websocket:
if not msg_rx:
break
print(f"RX: {msg_rx }")
# TODO Add to Queue
# HOW DO I ACCESS THE QUEUE?
print(f"Disconnected from Path '{path}'")
async def task_ws_server(q):
# TODO how do I pass q to the client handler???
async with websockets.serve(client_handler, '127.0.0.1', 5001):
await asyncio.Future() # run forever
async def task_consumer(q):
# get elements from Queue
while True:
data = await q.get()
# Process them like storing to file or forward to other code
print(data) # print as stand-in for more complex code
q.task_done()
async def main():
# Queue to allow moving data from client_handler to Task_consumer
q = asyncio.Queue()
# Start consumer task
consumer = asyncio.create_task(task_consumer(q))
# Start and run WS Server to handle incoming connections
await asyncio.gather(*[
asyncio.create_task(task_ws_server(q)),
])
await q.join()
consumer.cancel()
if __name__ == '__main__':
asyncio.run(main())
我找到了一个解决方案:将队列声明移到顶部,这意味着可以在异步函数内部访问队列。我不喜欢这个解决方案,因为它意味着我必须在本地声明client_handler或在全局作用域中公开队列
您可以让您的client_handler
以队列作为参数,并使用functools.partial
创建一个可以传递给websockets.serve
的函数
import functools
async def client_handler(websocket, path, queue):
# Do something with queue
pass
async def task_ws_server(q):
queued_client_handler = functools.partial(client_handler, queue=q)
async with websockets.serve(queued_client_handler, '127.0.0.1', 5001):
await asyncio.Future() # run forever
我找到了一个面向对象的解决方案。通过self,您可以访问client_handler
之外的变量import asyncio
import websockets
class CustomWebSocketServer():
def __init__(self, host='127.0.0.1', port=5001):
self.host = host
self.port = port
self.queue_rx = None
self.ws_clients = set()
async def run(self):
# Queue to allow moving data from client_handler to Task_consumer
self.queue_rx = asyncio.Queue()
# Start consumer task
consumer = asyncio.create_task(self.task_consumer())
# Start and run WS Server to handle incoming connections
await websockets.serve(self.client_handler, '127.0.0.1', 5001)
await asyncio.Future() # run forever
await self.queue_rx.join()
consumer.cancel()
# Websockets client Handler accepts data and puts it into queue
async def client_handler(self, ws, path):
print(f"Connected with path '{path}'")
try:
# Register ws client
self.ws_clients.add(ws)
async for msg_rx in ws:
if not msg_rx:
break
printf"RX: {msg_rx}")
# Add to Queue
await self.queue_rx.put(msg_rx)
finally:
self.ws_clients.remove(ws)
print(f"Disconnected from Path '{path}'")
async def task_consumer(self):
print"task consumer start")
# get elements from Queue
while True:
data = await self.queue_rx.get()
# Process them like storing to file or forward to other code
print(data) # print as stand-in for more complex code
self.queue_rx.task_done()
if __name__ == '__main__':
Server = CustomWebSocketServer()
asyncio.run(Server.run())