在Python中使用本地函数进行多处理的解决方案



使用本地定义的函数进行多处理

我正在为一个对外部依赖性非常挑剔的客户移植一个库。

该库中的大多数多处理都由pathos ProcessPool模块支持。主要原因是它可以非常容易地处理本地定义的函数。

我正在尝试在不强制这种依赖性(或者必须重写库的大块(的情况下恢复一些功能。我知道以下代码之所以有效,是因为函数是在顶级定义的:

import multiprocessing as mp

def f(x):
return x * x

def main():
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))

if __name__ == "__main__":
main()

以下代码(这是我需要工作的(失败了,因为函数只在本地范围中定义:

import multiprocessing as mp

def main():
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))

if __name__ == "__main__":
main()

有人知道这个不需要外部依赖的特定用例的一个好的解决方法吗?感谢阅读。

更新:

  • 有一个使用fork的解决方案,但这对Mac和Windows不安全(感谢@Monica和@user2357112(
  • @Blop提供了一个很好的建议,对很多人都有效。在我的情况下(不是上面的玩具示例(,生成器中的对象是不可分解的
  • @amsh提供了一种变通方法,似乎适用于任何函数+生成器。虽然这是一个很好的选择,但缺点是它需要在全局范围内定义函数

主要问题是闭包变量。

如果你没有这些,可以这样做:

import marshal
import multiprocessing
import types
from functools import partial

def main():
def internal_func(c):
return c*c
with multiprocessing.Pool(5) as pool:
print(internal_func_map(pool, internal_func, [i for i in range(10)]))

def internal_func_map(pool, f, gen):
marshaled = marshal.dumps(f.__code__)
return pool.map(partial(run_func, marshaled=marshaled), gen)

def run_func(*args, **kwargs):
marshaled = kwargs.pop("marshaled")
func = marshal.loads(marshaled)
restored_f = types.FunctionType(func, globals())
return restored_f(*args, **kwargs)

if __name__ == "__main__":
main()

其思想是,函数代码拥有在新流程中运行它所需的一切。请注意,不需要任何外部依赖项,只需要常规的python库。

如果确实需要闭包,那么这个解决方案最困难的部分实际上是创建闭包。(在闭包中有一种叫做"细胞"的东西,它不太容易用代码创建…(

这是一个有点复杂的工作代码:

import marshal
import multiprocessing
import pickle
import types
from functools import partial

class A:
def __init__(self, a):
self.a = a

def main():
x = A(1)
def internal_func(c):
return x.a + c
with multiprocessing.Pool(5) as pool:
print(internal_func_map(pool, internal_func, [i for i in range(10)]))

def internal_func_map(pool, f, gen):
closure = f.__closure__
marshaled_func = marshal.dumps(f.__code__)
pickled_closure = pickle.dumps(tuple(x.cell_contents for x in closure))
return pool.map(partial(run_func, marshaled_func=marshaled_func, pickled_closure=pickled_closure), gen)

def run_func(*args, **kwargs):
marshaled_func = kwargs.pop("marshaled_func")
func = marshal.loads(marshaled_func)
pickled_closure = kwargs.pop("pickled_closure")
closure = pickle.loads(pickled_closure)
restored_f = types.FunctionType(func, globals(), closure=create_closure(func, closure))
return restored_f(*args, **kwargs)

def create_closure(func, original_closure):
indent = " " * 4
closure_vars_def = f"n{indent}".join(f"{name}=None" for name in func.co_freevars)
closure_vars_ref = ",".join(func.co_freevars)
dynamic_closure = "create_dynamic_closure"
s = (f"""
def {dynamic_closure}():
{closure_vars_def}
def internal():
{closure_vars_ref}
return internal.__closure__
""")
exec(s)
created_closure = locals()[dynamic_closure]()
for closure_var, value in zip(created_closure, original_closure):
closure_var.cell_contents = value
return created_closure

if __name__ == "__main__":
main()

希望这能帮助你,或者至少能给你一些解决这个问题的想法!

原始答案

免责声明:如果您想在本地定义函数以更好地管理代码,但可以接受其全局范围,则此答案适用

在定义函数之前,可以使用全局关键字。它将解决酸洗函数的问题(因为它现在是一个全局函数(,同时在局部范围内定义它。

import multiprocessing as mp
def main():
global f
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
print(f(4)) #Inner function is available here as well.

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
16

添加了另一个具有相同名称的多个函数的示例,每个后续函数都会覆盖上一个函数。

import multiprocessing as mp
def main():
global f
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
def main2():
global f
def f(x):
return x * x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
main2()
print(f(4))

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
64

更新的答案

调用映射后,撤消全局状态。感谢@KCQ在评论中的提示

为了确保全局函数不会对代码的其余部分造成任何问题,您可以简单地为全局函数添加del语句来撤销其全局状态。

import multiprocessing as mp
def main():
global f
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
del f
if __name__ == "__main__":
main()
print(f(4)) #Inner function is not available.

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Traceback (most recent call last):
File "<file>.py", line 25, in <module>
print(f(4))
NameError: name 'f' is not defined

尽管python会自动收集垃圾,但您也可以手动调用垃圾收集器。

相关内容

  • 没有找到相关文章

最新更新