Python, FastAPI:多线程/处理-启动/停止带有API端点的进程



我一直在努力解决这个问题,现在还不能弄清楚。如果你能帮忙,我会很感激的。所以我有一个FastAPI服务器,我在其中部署了一个嗜睡检测模型/脚本(dlib, opencv2, scipy)。现在我想要实现的是-通过API端点启动和停止DDM。所以问题是- uvicorn服务器是单线程的,所以当我运行DDM时,它将在同一个线程中运行,当我试图停止DDM时,它会停止整个服务器进程(这不是我想要的)。我已经尝试分叉进程并在该进程上运行DDM,但它给出了一个错误和崩溃。我认为使用多线程可能会有所帮助,我不确定。此外,如果它确实帮助我解决我的问题,我不知道如何确切地接近它。相关代码:

# Drowsiness Detection Script
def eye_aspect_ratio(eye):
A = distance.euclidean(eye[1], eye[5])
B = distance.euclidean(eye[2], eye[4])
C = distance.euclidean(eye[0], eye[3])
ear = (A + B) / (2.0 * C)
return ear


def detect_drowsiness(monitor: bool):
pid_file = open("intelligence/drowsiness_detection/dataset/pid.txt", "w")
pid_str = str(os.getpid())
pid_file.write(pid_str)
pid_file.close()

thresh = 0.25
frame_check = 18
detect = dlib.get_frontal_face_detector()
# Dat file is the crux of the code
predict = dlib.shape_predictor(
"intelligence/drowsiness_detection/dataset/shape_predictor_68_face_landmarks.dat")

(lStart, lEnd) = face_utils.FACIAL_LANDMARKS_68_IDXS["left_eye"]
(rStart, rEnd) = face_utils.FACIAL_LANDMARKS_68_IDXS["right_eye"]
cap = cv2.VideoCapture(0)
flag = 0
while monitor:
ret, frame = cap.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
subjects = detect(gray, 0)
for subject in subjects:
shape = predict(gray, subject)
shape = face_utils.shape_to_np(
shape)  # converting to NumPy Array
leftEye = shape[lStart:lEnd]
rightEye = shape[rStart:rEnd]
leftEAR = eye_aspect_ratio(leftEye)
rightEAR = eye_aspect_ratio(rightEye)
ear = (leftEAR + rightEAR) / 2.0
if ear < thresh:
flag += 1
print("Detecting,{}".format(flag))
if flag >= frame_check:
print("ALERT - Drowsy")

else:
flag = 0
cap.release()




# Drowsiness detection for a user
@ router.get("/face/drowsy/start", response_description="Drowsiness monitoring for the user")
async def start_drowsiness_detection(background_tasks: BackgroundTasks):
background_tasks.add_task(detect_drowsiness, True)
return("Drowsiness monitoring ON")


@ router.get("/face/drowsy/stop", response_description="Drowsiness monitoring for the user")
async def stop_drowsiness_detection():
pid_file_path = f"intelligence/drowsiness_detection/dataset/pid.txt"
pid_file = open(pid_file_path, "r")
if not os.path.exists(pid_file_path):
return("Please start monitoring first")
pid_str = pid_file.read()
remove_file(pid_file_path)
os.kill(int(pid_str), signal.SIGKILL)

return("Drowsiness monitoring OFF")

可能的解决方法:

# Drowsiness Detection Script
def eye_aspect_ratio(eye):
A = distance.euclidean(eye[1], eye[5])
B = distance.euclidean(eye[2], eye[4])
C = distance.euclidean(eye[0], eye[3])
ear = (A + B) / (2.0 * C)
return ear

class DrowsinessDetector(Process):
running = Event()
def stop_monitoring(self):
if self.running.is_set():
self.running.clear()
def start_monitoring(self):
if self.running.is_set():
return
self.running.set()
self.detect_drowsiness()
def detect_drowsiness(self):
thresh = 0.25
frame_check = 18
detect = dlib.get_frontal_face_detector()
# Dat file is the crux of the code
predict = dlib.shape_predictor("./shape_predictor_68_face_landmarks.dat")
(lStart, lEnd) = face_utils.FACIAL_LANDMARKS_68_IDXS["left_eye"]
(rStart, rEnd) = face_utils.FACIAL_LANDMARKS_68_IDXS["right_eye"]
cap = cv2.VideoCapture(0)
flag = 0
while self.running.is_set():
ret, frame = cap.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
subjects = detect(gray, 0)
for subject in subjects:
shape = predict(gray, subject)
shape = face_utils.shape_to_np(shape)  # converting to NumPy Array
leftEye = shape[lStart:lEnd]
rightEye = shape[rStart:rEnd]
leftEAR = eye_aspect_ratio(leftEye)
rightEAR = eye_aspect_ratio(rightEye)
ear = (leftEAR + rightEAR) / 2.0
if ear < thresh:
flag += 1
print("Detecting - {}".format(flag))
if flag >= frame_check:
print("ALERT - Drowsy")
else:
flag = 0
cap.release()

# Drowsiness detection for a user
drowsy = DrowsinessDetector()
@router.get("/face/drowsy/start", response_description="Drowsiness monitoring for the user")
async def start_drowsiness_detection(background_tasks: BackgroundTasks):
background_tasks.add_task(drowsy.start_monitoring())
return "Drowsiness monitoring ON"

@router.get("/face/drowsy/stop", response_description="Drowsiness monitoring for the user")
async def stop_drowsiness_detection(background_tasks: BackgroundTasks):
background_tasks.add_task(drowsy.stop_monitoring())
return "Drowsiness monitoring OFF"

我从Reddit上得到了这个解决方案,但由于某种原因,它不起作用。如有任何帮助,我将不胜感激。

您也可以将非异步代码放入标准同步路由def中(这实际上是FastAPI鼓励的方法),然后FastAPI将在外部线程池中运行该代码并为您管理它。从那里,你可以简单地检查你的while循环中的任何状态(文件,redis, inMem字典,pub/sub),以停止嗜睡检测器。

https://fastapi.tiangolo.com/async/path-operation-functions

虽然FastAPI文档中没有明确提到,但BackgroundTasks.background_tasks将在同一进程上创建一个新线程。

使用您发布的第一个代码-当您将PID(进程ID)存储到detect_drowsiness()函数中的文件中,然后在stop_drowsiness_detection()路由/函数中杀死进程时,您实际上杀死了正在运行FastAPI的进程。

在FastAPI的后台任务部分,他们特别提到:

如果你需要执行繁重的后台计算,你不一定需要它由同一个进程运行(例如,你不需要共享内存,变量等),你可能会受益于使用其他更大的工具,如芹菜。

关于你发布的第二个代码,多处理的使用似乎朝着正确的方向。如果没有关于为什么特定实现不能工作的更多细节,就很难进一步帮助您。

有一个类似的问题(涉及通过cv2+fastapi进行图像处理),通过在不使用BackgroundTasks的情况下为任务打开一个新进程来解决这个问题。下面是一个片段:

import concurrent.futures
@app.post("/do_image_thing")
async def do_image_thing(params: Params):
detect_bool = params.detect_bool # params is just an input of type pydantic BaseModel, irrelevant for this problem
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.submit(detect_drowsiness, detect_bool)
return {'stuff': 'stuff'}