如何在使用Gunicorn和多个工人时仅创建1个类实例?



我有一个简单的Python后端使用falconwebsockets。如果一个客户端调用一个端点(例如,提交数据),所有其他连接的客户端通过他们各自的websocket连接得到通知,也就是说,后端向所有当前连接的客户端发出广播。一般来说,这工作得很好。以下是falcon应用程序的最小脚本

import falcon
from db.dbmanager import DBManager
from ws.wsserver import WebSocketServer
from api.resources.liveqa import DemoResource
dbm = DBManager() # PostgreSQL connection pool; works fine with multiple workers
wss = WebSocketServer() # Works only with 1 worker
app = falcon.App()
demo_resource = DemoResource(dbm, wss)
app.add_route('/api/v1/demo', demo_resource)

下面是我实例化并传递资源类的websockets服务器的代码:

import json
import asyncio
import websockets
import threading

class WebSocketServer:
def __init__(self):
self.clients = {}
self.start_server()

async def handler(self, ws, path):
session_id = path.split('/')[-1]
if session_id in self.clients:
self.clients[session_id].add(ws)
else:
self.clients[session_id] = {ws}
try:
async for msg in ws:
pass # The clients are not supposed to send anything
except websockets.ConnectionClosedError:
pass
finally:
self.clients[session_id].remove(ws)

async def send(self, client, msg):
await client.send(msg)

def broadcast(self, session_id, msg):
if session_id not in self.clients:
return
for client in self.clients[session_id]:
try:
asyncio.run(self.send(client, json.dumps(msg)))
except:
pass

def start_server(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_server = websockets.serve(self.handler, host='111.111.111.111', port=5555)
asyncio.get_event_loop().run_until_complete(start_server)
threading.Thread(target=asyncio.get_event_loop().run_forever).start()

我使用Gunicorn作为后端服务器,如果我只使用1个worker,它就可以工作。但是,如果我尝试--workers 2,我得到端口5555已经在使用的错误。我想这是有意义的,因为每个工作人员都试图使用相同的ip/端口对创建一个WebSocketServer实例。

解决这个问题的最好/最干净/最自然的方法是什么?我假设我必须确保只创建一个WebSocketServer实例。但如何?

附带说明,我假设为每个worker也创建了一个DBManager实例。虽然它不会抛出错误,因为可以有多个连接池,我想确保DBManager的单个实例也是首选的方式。

首先,即使只运行一个worker也有潜在的问题,因为Gunicorn主要是一个预分叉服务器,而用线程分叉一个进程通常是不安全的,可能会导致不可预测的结果。

解决这个问题的一种方法是使用Gunicorn的服务器钩子,只在一个worker中启动一个线程(在这个例子中是WebSocket服务器),并且只在fork之后启动。例如,

import logging
import os
import threading
import falcon
import gunicorn.app.base
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)

class HelloWorld:
def on_get(self, req, resp):
resp.media = {'message': 'Hello, World!'}

def do_something(fork_nr):
pid = os.getpid()
logging.info(f'in a thread, {pid=}')
if fork_nr == 1:
logging.info('we could start a WebSocket server...')
else:
logging.info('not the first worker, not starting any servers')

class HybridApplication(gunicorn.app.base.BaseApplication):
forks = 0
@classmethod
def pre_fork(cls, server, worker):
logging.info(f'about to fork a new worker #{cls.forks}')
cls.forks += 1
@classmethod
def post_fork(cls, server, worker):
thread = threading.Thread(
target=do_something, args=(cls.forks,), daemon=True)
thread.start()
def __init__(self):
self.options = {
'bind': '127.0.0.1:8000',
'pre_fork': self.pre_fork,
'post_fork': self.post_fork,
'workers': 4,
}
self.application = falcon.App()
self.application.add_route('/hello', HelloWorld())
super().__init__()
def load_config(self):
config = {key: value for key, value in self.options.items()
if key in self.cfg.settings and value is not None}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application

if __name__ == '__main__':
HybridApplication().run()

这个简单的原型并不是绝对正确的,因为我们还应该处理服务器重载、worker被杀死等问题。说到这一点,对于可能长时间运行的请求,您可能应该使用另一种工作线程类型而不是sync,或者设置一个很长的超时,因为否则工作线程可能会被杀死,并带走WebSocket线程。指定一定数量的线程会自动将工作线程类型更改为gthread

请注意,这里我实现了一个自定义的Gunicorn应用程序,但是您可以通过配置文件指定钩子来达到相同的效果。

另一个选择是使用ASGI风格的Falcon,甚至在你的应用程序中实现WebSocket部分:

import asyncio
import logging
import falcon.asgi
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)

class HelloWorld:
async def on_get(self, req, resp):
resp.media = {'message': 'Hello, World!'}
async def on_websocket(self, req, ws):
await ws.accept()
logging.info(f'WS accepted {req.path=}')
try:
while True:
await ws.send_media({'message': 'hi'})
await asyncio.sleep(10)
finally:
logging.info(f'WS disconnected {req.path=}')

app = falcon.asgi.App()
app.add_route('/hello', HelloWorld())

请注意,Gunicorn本身不"说话"。所以你要么需要使用ASGI应用服务器,要么使用Gunicorn作为Uvicorn工人的进程管理器。例如,假设您的文件名为test.py,您可以直接运行Uvicorn:

pip install uvicorn[standard]
uvicorn test:app

然而,如果你走ASGI路线,你需要实现你的响应作为协程函数(async def on_get(...)等),或者在线程池执行器中运行同步DB代码。

相关内容

  • 没有找到相关文章

最新更新