在python中启动多个vunicorn应用



我需要创建一个Python应用程序来处理API (fastAPI)和套接字(socketio)。我找不到在同一个python脚本中启动两个vunicorn应用程序的方法。请注意,我可以用任何其他允许我修复此问题的库替换vunicorn。代码:

import json
from fastapi import FastAPI
import socketio
import uvicorn

# create API app
app_api = FastAPI()
# create a Socket.IO server and wrap with a WSGI application
sio = socketio.Server(port=8001)
app_sio = socketio.WSGIApp(sio)

@app_api.get("/")
def read_root():
return {"Hello": "World"}

@sio.on('*', namespace="/aws-iam")
def catch_all(event, sid, data):
print(event, sid, data)

我不确定如何启动app_apiapp_sio。我不能从主线程开始,因为uvicorn.run(...)阻塞,只有第一个调用会运行。我试图在两个不同的线程中启动它们,但我得到了错误:

if __name__ == "__main__":
def start_api():
uvicorn.run("testing.mock_api:app_api", host='127.0.0.1', port=8000, reload=True, debug=True) #, workers=3)
def start_sio():
uvicorn.run("testing.mock_api:app_sio", host='127.0.0.1', port=8001, reload=True, debug=True) # , workers=3)

from threading import Thread
import time
threads = [
Thread(target=start_sio),
Thread(target=start_api),
]
for thread in threads:
thread.start()

time.sleep(999999999999)

与多线程我得到错误:

File "/src/testing/mock_api.py", line 55, in start_api
uvicorn.run("testing.mock_api:app_api", host='127.0.0.1', port=8000, reload=True, debug=True) #, workers=3)
File "/usr/local/lib/python3.6/dist-packages/uvicorn/main.py", line 447, in run
ChangeReload(config, target=server.run, sockets=[sock]).run()
File "/usr/local/lib/python3.6/dist-packages/uvicorn/supervisors/basereload.py", line 43, in run
self.startup()
File "/usr/local/lib/python3.6/dist-packages/uvicorn/supervisors/basereload.py", line 62, in startup
signal.signal(sig, self.signal_handler)
File "/usr/lib/python3.6/signal.py", line 47, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread

与其分别运行这两个应用程序,不如只运行FastAPI应用程序&把socketio应用挂载到这个上面?

要用这个github问题中的一个示例来说明,您可以这样做:

import socketio    
from fastapi import FastAPI
import uvicorn 


app = FastAPI()    

sio = socketio.AsyncServer(async_mode='asgi')    
socket_app = socketio.ASGIApp(sio)    

@sio.on('*', namespace="/aws-iam")
async def catch_all(event, sid, data):
print(event, sid, data)

@app.get("/hello")
async def root():
return {"message": "Hello World"}

app.mount('/', socket_app)

if __name__ == "__main__":
uvicorn.run("main:app", reload=True, port=8080)

包括将socketio应用切换为异步

最新更新