如何正确地将同步函数转换为异步函数?



我正在编写一个电报机器人,我需要这个机器人对用户可用,即使它正在处理一些先前的请求。我的机器人下载了一些视频,如果超过了大小限制,就会对它们进行压缩,所以处理请求需要一些时间。我想把我的同步函数变成异步函数,并在另一个进程中处理它们,以实现这一点。

我找到了一种方法来做到这一点,使用这篇文章,但它不适合我。这是我测试解决方案的代码:

import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import wraps, partial
executor = ProcessPoolExecutor()
def async_wrap(func):
@wraps(func)
async def run(*args, **kwargs):
loop = asyncio.get_running_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)

return run 
@async_wrap
def sync_func(a):
import time
time.sleep(10)
if __name__ == "__main__":
asyncio.run(sync_func(4))
结果,我得到了以下错误消息:
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/multiprocessing/queues.py", line 245, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function sync_func at 0x7f2e333625f0>: it's not the same object as __main__.sync_func
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/mikhail/Projects/social_network_parsing_bot/processes.py", line 34, in <module>
asyncio.run(sync_func(4))
File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/home/mikhail/Projects/social_network_parsing_bot/processes.py", line 18, in run
return await loop.run_in_executor(executor, pfunc)
File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/multiprocessing/queues.py", line 245, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function sync_func at 0x7f2e333625f0>: it's not the same object as __main__.sync_func

据我所知,出现错误是因为decorator更改了函数,结果返回了一个新对象。我需要改变我的代码,使其工作。也许我不理解一些关键的概念,有一些简单的方法可以达到预期的效果。谢谢你的帮助

本文运行了一个很好的实验,但它实际上只是用于线程池执行器,而不是用于多处理的执行器。

如果你看到它的代码,在某些时候,它将executor=None传递给.run_in_executor调用,并且asyncio创建了一个默认的执行器,它是ThreadPoolExecutor。

与ProcessPoolExecutor的主要区别在于,所有跨进程移动的数据(因此,所有发送给worker的数据,包括目标函数)都必须被序列化——这是通过Python的pickle完成的。

现在,函数的Pickle序列化并没有真正发送函数对象及其字节码:相反,它只是发送函数qualname,并且期望另一端具有相同qualname的函数与原始函数相同。

在您的代码中,func是执行器池的目标,它是声明的函数,在它被包装在装饰器(__main__.sync_func)之前。但是在目标进程中与这个名称一起存在的是修饰后的函数。因此,如果Python不会因为函数不相同而阻塞它,那么您将进入一个无限循环,创建数百个嵌套子进程,并且永远不会真正调用您的函数-因为目标中的入口点将是包装函数。这只是你所浏览的文章中的一个错误。

说了这么多,让这一切工作的更简单的方法是,不以通常的方式使用这个装饰器,只是在模块命名空间中保留原始的、未修饰的函数,并为包装的函数创建一个新名称——这样,"raw";代码可以是执行器的目标:

(...)
def sync_func(a):
import time
time.sleep(2)
print(f"finished {a}")
# this creates the decorated function with a new name,
# instead of replacing the original:
wrapped_sync = async_wrap(sync_func)
if __name__ == "__main__":
asyncio.run(wrapped_sync("go go go"))

最新更新