使用本地定义的函数进行多处理
我正在为一个对外部依赖性非常挑剔的客户移植一个库。
该库中的大多数多处理都由pathos ProcessPool模块支持。主要原因是它可以非常容易地处理本地定义的函数。
我正在尝试在不强制这种依赖性(或者必须重写库的大块(的情况下恢复一些功能。我知道以下代码之所以有效,是因为函数是在顶级定义的:
import multiprocessing as mp
def f(x):
return x * x
def main():
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
以下代码(这是我需要工作的(失败了,因为函数只在本地范围中定义:
import multiprocessing as mp
def main():
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
有人知道这个不需要外部依赖的特定用例的一个好的解决方法吗?感谢阅读。
更新:
- 有一个使用fork的解决方案,但这对Mac和Windows不安全(感谢@Monica和@user2357112(
- @Blop提供了一个很好的建议,对很多人都有效。在我的情况下(不是上面的玩具示例(,生成器中的对象是不可分解的
- @amsh提供了一种变通方法,似乎适用于任何函数+生成器。虽然这是一个很好的选择,但缺点是它需要在全局范围内定义函数
主要问题是闭包变量。
如果你没有这些,可以这样做:
import marshal
import multiprocessing
import types
from functools import partial
def main():
def internal_func(c):
return c*c
with multiprocessing.Pool(5) as pool:
print(internal_func_map(pool, internal_func, [i for i in range(10)]))
def internal_func_map(pool, f, gen):
marshaled = marshal.dumps(f.__code__)
return pool.map(partial(run_func, marshaled=marshaled), gen)
def run_func(*args, **kwargs):
marshaled = kwargs.pop("marshaled")
func = marshal.loads(marshaled)
restored_f = types.FunctionType(func, globals())
return restored_f(*args, **kwargs)
if __name__ == "__main__":
main()
其思想是,函数代码拥有在新流程中运行它所需的一切。请注意,不需要任何外部依赖项,只需要常规的python库。
如果确实需要闭包,那么这个解决方案最困难的部分实际上是创建闭包。(在闭包中有一种叫做"细胞"的东西,它不太容易用代码创建…(
这是一个有点复杂的工作代码:
import marshal
import multiprocessing
import pickle
import types
from functools import partial
class A:
def __init__(self, a):
self.a = a
def main():
x = A(1)
def internal_func(c):
return x.a + c
with multiprocessing.Pool(5) as pool:
print(internal_func_map(pool, internal_func, [i for i in range(10)]))
def internal_func_map(pool, f, gen):
closure = f.__closure__
marshaled_func = marshal.dumps(f.__code__)
pickled_closure = pickle.dumps(tuple(x.cell_contents for x in closure))
return pool.map(partial(run_func, marshaled_func=marshaled_func, pickled_closure=pickled_closure), gen)
def run_func(*args, **kwargs):
marshaled_func = kwargs.pop("marshaled_func")
func = marshal.loads(marshaled_func)
pickled_closure = kwargs.pop("pickled_closure")
closure = pickle.loads(pickled_closure)
restored_f = types.FunctionType(func, globals(), closure=create_closure(func, closure))
return restored_f(*args, **kwargs)
def create_closure(func, original_closure):
indent = " " * 4
closure_vars_def = f"n{indent}".join(f"{name}=None" for name in func.co_freevars)
closure_vars_ref = ",".join(func.co_freevars)
dynamic_closure = "create_dynamic_closure"
s = (f"""
def {dynamic_closure}():
{closure_vars_def}
def internal():
{closure_vars_ref}
return internal.__closure__
""")
exec(s)
created_closure = locals()[dynamic_closure]()
for closure_var, value in zip(created_closure, original_closure):
closure_var.cell_contents = value
return created_closure
if __name__ == "__main__":
main()
希望这能帮助你,或者至少能给你一些解决这个问题的想法!
原始答案
免责声明:如果您想在本地定义函数以更好地管理代码,但可以接受其全局范围,则此答案适用
在定义函数之前,可以使用全局关键字。它将解决酸洗函数的问题(因为它现在是一个全局函数(,同时在局部范围内定义它。
import multiprocessing as mp
def main():
global f
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
print(f(4)) #Inner function is available here as well.
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
16
添加了另一个具有相同名称的多个函数的示例,每个后续函数都会覆盖上一个函数。
import multiprocessing as mp
def main():
global f
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
def main2():
global f
def f(x):
return x * x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
main2()
print(f(4))
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
64
更新的答案
调用映射后,撤消全局状态。感谢@KCQ在评论中的提示
为了确保全局函数不会对代码的其余部分造成任何问题,您可以简单地为全局函数添加del语句来撤销其全局状态。
import multiprocessing as mp
def main():
global f
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
del f
if __name__ == "__main__":
main()
print(f(4)) #Inner function is not available.
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Traceback (most recent call last):
File "<file>.py", line 25, in <module>
print(f(4))
NameError: name 'f' is not defined
尽管python会自动收集垃圾,但您也可以手动调用垃圾收集器。