在具有多个工作线程的Uvicorn中使用多处理(线程锁定)



我正在构建一个API,通过uvicorn提供FastAPI服务。API具有使用python多处理库的端点。

一个端点为一个CPU绑定的任务生成多个进程,以便在parallel中执行这些任务。以下是高级代码逻辑概述:

import multiprocessing as mp
class Compute:

def single_compute(self, single_comp_data):
# Computational Task CPU BOUND
global queue
queue.put(self.compute(single_comp_data))
def multi_compute(self, task_ids):
# Prepare for Compuation
output = {}
processes = []
global queue
queue = mp.Queue()

# Start Test Objs Computation
for tid in task_ids:
# Load  task data here, to make use of object in memory cache
single_comp_data = self.load_data_from_cache(tid)
p = mp.Process(target=self.single_compute, args=single_comp_data)
p.start()
processes.append(p)
# Collect Parallel Computation
for p in processes:
result = queue.get()
output[result["tid"]]= result
p.join()
return output

以下是简单的API代码:

from fastapi import FastAPI, Response
import json

app = FastAPI()
#comp holds an in memory cache, thats why its created in global scope
comp = Compute()
@app.get("/compute")
def compute(task_ids):
result = comp.multi_compute(task_ids)
return Response(content=json.dumps(result, default=str), media_type="application/json")

当与这样的多个工作人员一起运行时:

uvicorn compute_api:app --host 0.0.0.0 --port 7000 --workers 2

我得到这个python错误

TypeError: can't pickle _thread.lock objects

只需一个工人流程就可以了。该程序在UNIX/LINUX操作系统上运行。

有人能向我解释一下,为什么一个新工艺的分叉在这里的多个uvicorn工艺中是不可能的,以及为什么我会遇到这种情况吗?

最终应该实现的很简单:

uvicorn进程,该进程生成多个其他进程(子进程通过fork(与该uvicorn过程的记忆副本。执行cpu绑定任务。

类型错误:无法pickle_thread.lock对象

源于您在中传递到子流程的任何数据

p = mp.Process(target=self.single_compute, args=single_comp_data)

包含不可拾取的对象。

发送到multiprocessing子流程的所有args/kwargs(无论是通过Process还是Pool中的更高级别方法(都必须是可拾取的,同样,函数运行的返回值也必须是可选择的,以便将其发送回父流程。

如果您在UNIX上,并且使用fork启动方法进行多处理(这是Linux上的默认方法,但在macOS上不是(,您还可以利用写时复制内存语义来避免"复制";向下";通过使数据可用,例如通过实例状态、全局变量。。。,在生成子流程之前,让它通过引用获取它,而不是将数据本身作为参数向下传递。

这个示例使用imap_unordered来提高性能(假设不需要按顺序处理ID(,并将返回一个dict,将输入ID映射到它创建的结果。

class Compute:
_cache = {}  # could be an instance variable too but whatever
def get_data(self, id):
if id not in self._cache:
self._cache[id] = get_data_from_somewhere(id)
return self._cache[id]
def compute_item(self, id):
data = self.get_data(id)
result = 42  # ... do heavy computation here ...
return (id, result)
def compute_result(self, ids) -> dict:
for id in ids:
self.get_data(id)  # populate in parent process
with multiprocessing.Pool() as p:
return dict(p.imap_unordered(self.compute_item, ids))

最新更新