我想写一个包装器,用于在异步中调用CPU需求函数。
我希望它像这样使用:
@cpu_bound
def fact(x: int):
res: int = 1
while x != 1:
res *= x
x -= 1
return res
async def foo(x: int):
res = await fact(x)
...
起初,我写道:
def cpu_bound(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
executor = get_executor() # This is a part where I implemented myself.
return await loop.run_in_executor(
executor, functools.partial(func, *args, **kwargs)
)
return wrapper
然而,我在腌制方面遇到了问题。
Traceback(上次调用(:文件"C: \Users\Lenovo\AppData\Local\Programs\Python39\lib\multiprocessing\queues.py";,第245行,输入obj=_ForkingPickler.dumps(obj(文件";C: \Users\Lenovo\AppData\Local\Programs\Python39\lib\multiprocessing\reduction.py";,第51行,转储cls(buf,protocol(.dump(obj(_pickle.PicklingError:无法pickle<0x000001C2D7D40820处的函数事实>:它与main不是同一个对象。事实
也许原始函数和封装的函数不具有相同的id
是问题所在?
那么,有没有办法写这样一个包装器呢?
我知道我可以使用loop.run_in_executor
,但有这样一个包装器会有很大帮助。
也许原始函数和封装的函数没有相同的id是问题所在?
在某种程度上,是的。在函数被发送到目标进程之前,它会被pickle,在您的情况下会失败,因为decorator重新绑定后,decorator作用域中的func
对象与主模块中的fact
对象不同。看看这个和这个问题,了解一些背景知识。
基于这些答案,我创建了一个关于如何实现你想要的东西的例子。诀窍是创建一个可拾取的";"转轮";函数,目标进程可以使用它从某种注册表(例如dict..(中查找您的原始函数并运行它。当然,这只是一个例子。您可能不想在decorator中创建ProcessPoolExecutor
。
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
original_functions={}
def func_runner(name, *args):
return original_functions[name](*args)
def cpu_bound(func):
original_functions[func.__name__]=func
@functools.wraps(func)
async def wrapper(*args):
with ProcessPoolExecutor(1) as pool:
res = await asyncio.get_running_loop().run_in_executor(
pool, functools.partial(func_runner, func.__name__, *args)
)
return res
return wrapper
@cpu_bound
def fact(arg):
return arg
async def foo():
res = await fact("ok")
print(res)
if __name__ == "__main__":
asyncio.run(foo())