将带有对象的函数传递到concurrent.forets.ProcessPoolExecutor()中



需要帮助将对象传递到cpu_bound函数中。该程序同时使用asyncio多处理,因此如果您同时了解这两种方法,这将是最好的帮助!

基本上问题出现在:result = loop.run_in_executor(pool, lambda: cpu_bound(list1, list2, int_var)

我无法将lambda函数传递到池中,并且使用以下代码编程错误:_pickle.PicklingError: Can't pickle <function <lambda> at 0x00000230FDEDD700>: attribute lookup <lambda> on __main__ failed

这是我的程序的模拟结构,因为整个程序有2000多行代码:

import ...
# Defining some functions...
.
def cpu_bound(list1, list2, int_var):
# Some CPU-bound calculations...
.
async def find_trades(session, list3, list4):
# Some async function calls
.
with concurrent.futures.ProcessPoolExecutor() as pool:
result = loop.run_in_executor(
pool, dill.loads(dill.dumps(lambda: cpu_bound(list1, list2, int_var)))
try:
await asyncio.wait_for(
result, timeout=5
)
except asyncio.TimeoutError:
print("Took to long to compute!")
async def run():
# Some async function calls
.
await asyncio.gather(find_trades(session, list3, list4), ...)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

不幸的是,我对多处理相对陌生,可能不太了解将对象从主程序的循环传递到多处理部分所带来的限制

真的很感谢所有的帮助!

以下行:

loop.run_in_executor(
pool, dill.loads(dill.dumps(lambda: cpu_bound(list1, list2, int_var)))

似乎没有多大意义。您正在使用dill序列化lambda,但在它有机会传输到子流程之前,您将立即将它反序列化回lambda。这可能就是为什么你会从pickle中得到一个错误,尽管你试图使用dill。

但是,通过将预期参数作为位置参数传递给run_in_executor:,您可以首先避免lambda

result = loop.run_in_executor(pool, cpu_bound, list1, list2, int_var)

如果列表中包含可拾取的对象,这应该可以正常工作。

相关内容

  • 没有找到相关文章

最新更新